Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
notarious2 committed Mar 22, 2024
1 parent 2ebb1ea commit ccb8fa5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
22 changes: 22 additions & 0 deletions src/managers/websocket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self):
self.chats: dict = {} # stores user WebSocket connections by chat {"chat_guid": {ws1, ws2}, ...}
self.pubsub_client = RedisPubSubManager()
self.user_guid_to_websocket: dict = {} # stores user_guid: {ws1, ws2} combinations
self.user_channel: dict = {}

def handler(self, message_type):
def decorator(func):
Expand Down Expand Up @@ -43,6 +44,27 @@ async def broadcast_to_chat(self, chat_guid: str, message: str | dict) -> None:
message = json.dumps(message)
await self.pubsub_client.publish(chat_guid, message)

async def create_user_channel(self, user_guid: str, websocket: WebSocket):
if user_guid in self.chats:
self.user_channel[user_guid].add(websocket)
else:
self.user_channel[user_guid] = {websocket}
await self.pubsub_client.connect()
pubsub_subscriber = await self.pubsub_client.subscribe(user_guid)
asyncio.create_task(self._pubsub_data_reader(pubsub_subscriber))

async def broadcast_to_user(self, user_guid: str, message: str | dict):
if isinstance(message, dict):
message = json.dumps(message)
await self.pubsub_client.publish(user_guid, message)

async def remove_websocket_from_user_channel(self, user_guid: str, websocket: WebSocket) -> None:
self.user_channel[user_guid].remove(websocket)
if len(self.user_channel[user_guid]) == 0:
del self.user_channel[user_guid]
logger.info("Removing user from PubSub channel {chat_guid}")
await self.pubsub_client.unsubscribe(user_guid)

async def remove_user_from_chat(self, chat_guid: str, websocket: WebSocket) -> None:
self.chats[chat_guid].remove(websocket)
if len(self.chats[chat_guid]) == 0:
Expand Down
13 changes: 7 additions & 6 deletions src/websocket/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_message_by_guid,
mark_last_read_message,
mark_user_as_online,
send_new_chat_created_ws_message,
notify_friend_about_new_chat,
)

logger = logging.getLogger(__name__)
Expand All @@ -50,7 +50,7 @@ async def new_message_handler(
message_schema = ReceiveMessageSchema(**incoming_message)
chat_guid: str = str(message_schema.chat_guid)

notify_friend_about_new_chat: bool = False
# notify_friend_about_new_chat: bool = False
# newly created chat
if not chats or chat_guid not in chats:
chat_id: int | None = await get_chat_id_by_guid(db_session, chat_guid=chat_guid)
Expand All @@ -59,7 +59,7 @@ async def new_message_handler(
chats[chat_guid] = chat_id
await socket_manager.add_user_to_chat(chat_guid, websocket)
# must notify friend that new chat has been created
notify_friend_about_new_chat = True
# notify_friend_about_new_chat = True

else:
await socket_manager.send_error("Chat has not been added", websocket)
Expand Down Expand Up @@ -112,10 +112,11 @@ async def new_message_handler(
outgoing_message: dict = send_message_schema.model_dump_json()

await socket_manager.broadcast_to_chat(chat_guid, outgoing_message)
await notify_friend_about_new_chat(socket_manager=socket_manager, current_user=current_user, chat=chat)

if notify_friend_about_new_chat:
logger.info("Notifying friend about newly created chat")
await send_new_chat_created_ws_message(socket_manager=socket_manager, current_user=current_user, chat=chat)
# if notify_friend_about_new_chat:
# logger.info("Notifying friend about newly created chat")
# await send_new_chat_created_ws_message(socket_manager=socket_manager, current_user=current_user, chat=chat)


@socket_manager.handler("message_read")
Expand Down
2 changes: 1 addition & 1 deletion src/websocket/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UserTypingSchema(BaseModel):


class NewChatCreated(BaseModel):
type: str = "new_chat_created"
type: str = "new_chat_created" # handled by frontend only
chat_id: int # need to pass for guid/id mapping [chats variable]
chat_guid: UUID4
created_at: datetime
Expand Down
38 changes: 38 additions & 0 deletions src/websocket/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,44 @@ async def get_chat_id_by_guid(db_session: AsyncSession, *, chat_guid: UUID) -> i
return chat_id


async def notify_friend_about_new_chat(socket_manager: WebSocketManager, current_user: User, chat: Chat):
# get friend guid

friend_guid: UUID | None = next((user.guid for user in chat.users if not user == current_user), None)
if not friend_guid:
logger.error("Friend guid not found", extra={"type": "new_chat_created", "friend_guid": friend_guid})
return

# check if friend has active websocket connections
friend_websockets: set[WebSocket] = socket_manager.user_channel.get(str(friend_guid))

if not friend_websockets:
logger.info(
"Friend doesn't have active connections", extra={"type": "new_chat_created", "friend_guid": friend_guid}
)
return

current_user_info: dict = {
"guid": current_user.guid,
"first_name": current_user.first_name,
"last_name": current_user.last_name,
"username": current_user.username,
"user_image": current_user.user_image,
}
payload = NewChatCreated(
chat_id=chat.id,
chat_guid=chat.guid,
created_at=chat.created_at,
updated_at=chat.updated_at,
friend=current_user_info, # current user becomes a friend for a user that receives this message
has_new_messages=True,
new_messages_count=1,
)

json_payload: str = payload.model_dump_json()
await socket_manager.broadcast_to_user(str(friend_guid), json_payload)


async def send_new_chat_created_ws_message(socket_manager: WebSocketManager, current_user: User, chat: Chat):
"""
Send a new chat created message to friend's websocket connections in the chat.
Expand Down

0 comments on commit ccb8fa5

Please sign in to comment.