diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9bc4cd6..df11454 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,10 +13,10 @@ jobs: - name: Install Python uses: actions/setup-python@v5 with: - python-version: "3.11" + python-version: "3.11.3" - name: Install dependencies run: | - poetry env use "3.11" + poetry env use "3.11.3" poetry export --only lint --output lint-requirements.txt pip install -r lint-requirements.txt - name: Run Ruff @@ -49,13 +49,13 @@ jobs: - name: Install Python uses: actions/setup-python@v5 with: - python-version: "3.11" + python-version: "3.11.3" - uses: actions/setup-python@v4 with: - python-version: 3.11 + python-version: 3.11.3 - run: | - poetry env use "3.11" + poetry env use "3.11.3" poetry install poetry run pytest -x -n auto --dist loadfile env: diff --git a/docker-compose.yml b/docker-compose.yml index c163687..15ca5de 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,7 @@ services: - db networks: - chat-net - command: uvicorn src.main:app --host=0.0.0.0 --port=8001 --reload + command: gunicorn -w 2 -k uvicorn.workers.UvicornWorker src.main:app -b 0.0.0.0:8001 db: image: postgres:15-alpine diff --git a/src/config.py b/src/config.py index 94f5654..7a28ab2 100644 --- a/src/config.py +++ b/src/config.py @@ -40,7 +40,7 @@ class GlobalSettings(BaseSettings): ADMIN_SECRET_KEY: str = "Hv9LGqARc473ceBUYDw1FR0QaXOA3Ky4" # redis for caching - REDIS_CACHE_ENABLED: bool = True + REDIS_CACHE_ENABLED: bool = False REDIS_HOST: str = "chat-redis" REDIS_PORT: str | int = 6379 REDIS_PASSWORD: str | None = None diff --git a/src/managers/websocket_manager.py b/src/managers/websocket_manager.py index 2141b9f..34de235 100644 --- a/src/managers/websocket_manager.py +++ b/src/managers/websocket_manager.py @@ -15,6 +15,7 @@ def __init__(self): self.chats: dict = {} # stores user WebSocket connections by chat {"chat_guid": {ws1, ws2}, ...} self.pubsub_client = RedisPubSubManager() self.user_guid_to_websocket: dict = {} # stores user_guid: {ws1, ws2} combinations + self.user_channel: dict = {} def handler(self, message_type): def decorator(func): @@ -43,11 +44,33 @@ async def broadcast_to_chat(self, chat_guid: str, message: str | dict) -> None: message = json.dumps(message) await self.pubsub_client.publish(chat_guid, message) + async def create_user_channel(self, user_guid: str, websocket: WebSocket): + if user_guid in self.user_channel: + self.user_channel[user_guid].add(websocket) + else: + self.user_channel[user_guid] = {websocket} + await self.pubsub_client.connect() + pubsub_subscriber = await self.pubsub_client.subscribe(user_guid) + asyncio.create_task(self._pubsub_data_reader_for_user(pubsub_subscriber)) + print("Guys", self.user_channel) + + async def broadcast_to_user(self, user_guid: str, message: str | dict): + if isinstance(message, dict): + message = json.dumps(message) + await self.pubsub_client.publish(user_guid, message) + + async def remove_websocket_from_user_channel(self, user_guid: str, websocket: WebSocket) -> None: + self.user_channel[user_guid].remove(websocket) + if len(self.user_channel[user_guid]) == 0: + del self.user_channel[user_guid] + logger.info(f"Removing user from PubSub channel {user_guid}") + await self.pubsub_client.unsubscribe(user_guid) + async def remove_user_from_chat(self, chat_guid: str, websocket: WebSocket) -> None: self.chats[chat_guid].remove(websocket) if len(self.chats[chat_guid]) == 0: del self.chats[chat_guid] - logger.info("Removing user from PubSub channel {chat_guid}") + logger.info(f"Removing user from PubSub channel {chat_guid}") await self.pubsub_client.unsubscribe(chat_guid) async def remove_user_guid_to_websocket(self, user_guid: str, websocket: WebSocket): @@ -69,5 +92,19 @@ async def _pubsub_data_reader(self, pubsub_subscriber): except Exception as exc: logger.exception(f"Exception occurred: {exc}") + async def _pubsub_data_reader_for_user(self, pubsub_subscriber): + try: + while True: + message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True) + if message is not None: + chat_guid = message["channel"].decode("utf-8") + sockets = self.user_channel.get(chat_guid) + if sockets: + for socket in sockets: + data = message["data"].decode("utf-8") + await socket.send_text(data) + except Exception as exc: + logger.exception(f"Exception occurred: {exc}") + async def send_error(self, message: str, websocket: WebSocket): await websocket.send_json({"status": "error", "message": message}) diff --git a/src/websocket/handlers.py b/src/websocket/handlers.py index 9e76be6..b58dae2 100644 --- a/src/websocket/handlers.py +++ b/src/websocket/handlers.py @@ -1,10 +1,8 @@ -import asyncio import logging from datetime import datetime import redis.asyncio as aioredis from fastapi import WebSocket -from fastapi.encoders import jsonable_encoder from sqlalchemy.ext.asyncio import AsyncSession from src.managers.websocket_manager import WebSocketManager @@ -23,7 +21,7 @@ get_message_by_guid, mark_last_read_message, mark_user_as_online, - send_new_chat_created_ws_message, + notify_friend_about_new_chat, ) logger = logging.getLogger(__name__) @@ -50,16 +48,18 @@ async def new_message_handler( message_schema = ReceiveMessageSchema(**incoming_message) chat_guid: str = str(message_schema.chat_guid) - notify_friend_about_new_chat: bool = False + # notify_friend_about_new_chat: bool = False # newly created chat if not chats or chat_guid not in chats: + print("YES", chats) chat_id: int | None = await get_chat_id_by_guid(db_session, chat_guid=chat_guid) if chat_id: + print("Chat ID", chat_id) # this action modifies chats variable in websocket view chats[chat_guid] = chat_id await socket_manager.add_user_to_chat(chat_guid, websocket) # must notify friend that new chat has been created - notify_friend_about_new_chat = True + # notify_friend_about_new_chat = True else: await socket_manager.send_error("Chat has not been added", websocket) @@ -113,9 +113,11 @@ async def new_message_handler( await socket_manager.broadcast_to_chat(chat_guid, outgoing_message) - if notify_friend_about_new_chat: - logger.info("Notifying friend about newly created chat") - await send_new_chat_created_ws_message(socket_manager=socket_manager, current_user=current_user, chat=chat) + await notify_friend_about_new_chat(socket_manager=socket_manager, current_user=current_user, chat=chat) + + # if notify_friend_about_new_chat: + # logger.info("Notifying friend about newly created chat") + # await send_new_chat_created_ws_message(socket_manager=socket_manager, current_user=current_user, chat=chat) @socket_manager.handler("message_read") @@ -248,8 +250,6 @@ async def chat_deleted_handler( # get all websocket connections that belong to this chat (except for ws that sent this messsage) # and send notification that chat has been removed - target_websockets: set[WebSocket] = socket_manager.chats.get(chat_guid) - outgoing_message = { "type": "chat_deleted", "user_guid": str(current_user.guid), @@ -257,13 +257,17 @@ async def chat_deleted_handler( "chat_guid": chat_guid, } - if target_websockets: - # Send the notification message to the target user concurrently - # used to notify frontend - await asyncio.gather( - *[ - socket.send_json(jsonable_encoder(outgoing_message)) - for socket in target_websockets - if socket != websocket - ] - ) + await socket_manager.broadcast_to_chat(chat_guid, outgoing_message) + + # target_websockets: set[WebSocket] = socket_manager.chats.get(chat_guid) + + # if target_websockets: + # # Send the notification message to the target user concurrently + # # used to notify frontend + # await asyncio.gather( + # *[ + # socket.send_json(jsonable_encoder(outgoing_message)) + # for socket in target_websockets + # if socket != websocket + # ] + # ) diff --git a/src/websocket/router.py b/src/websocket/router.py index f1034ce..a64c729 100644 --- a/src/websocket/router.py +++ b/src/websocket/router.py @@ -52,6 +52,10 @@ async def websocket_endpoint( else: chats = dict() + # add user socket connection to user channel + await socket_manager.create_user_channel(user_guid=str(current_user.guid), websocket=websocket) + logger.debug(f"User channels {socket_manager.user_channel}") + # task for sending status messages, not dependent on cache_enabled user_status_task = asyncio.create_task(check_user_statuses(cache, socket_manager, current_user, chats)) @@ -96,8 +100,9 @@ async def websocket_endpoint( await socket_manager.send_error("You have sent too many requests", websocket) except WebSocketDisconnect: - logging.info("Websocket is disconnected") + logger.info("Websocket is disconnected") # unsubscribe user websocket connection from all chats + await socket_manager.remove_websocket_from_user_channel(user_guid=str(current_user.guid), websocket=websocket) if chats: for chat_guid in chats: await socket_manager.remove_user_from_chat(chat_guid, websocket) diff --git a/src/websocket/schemas.py b/src/websocket/schemas.py index 1f97bf7..c978a7d 100644 --- a/src/websocket/schemas.py +++ b/src/websocket/schemas.py @@ -34,7 +34,7 @@ class UserTypingSchema(BaseModel): class NewChatCreated(BaseModel): - type: str = "new_chat_created" + type: str = "new_chat_created" # handled by frontend only chat_id: int # need to pass for guid/id mapping [chats variable] chat_guid: UUID4 created_at: datetime diff --git a/src/websocket/services.py b/src/websocket/services.py index c6bf392..652644e 100644 --- a/src/websocket/services.py +++ b/src/websocket/services.py @@ -155,6 +155,50 @@ async def get_chat_id_by_guid(db_session: AsyncSession, *, chat_guid: UUID) -> i return chat_id +async def notify_friend_about_new_chat(socket_manager: WebSocketManager, current_user: User, chat: Chat): + # get friend guid + + friend_guid: UUID | None = next((user.guid for user in chat.users if not user == current_user), None) + if not friend_guid: + logger.error("Friend guid not found", extra={"type": "new_chat_created", "friend_guid": friend_guid}) + return + logger.debug(f"BROADCASTING TO FRIEND {friend_guid}") + await socket_manager.broadcast_to_user(str(friend_guid), {"Message": "WTF"}) + + # check if friend has active websocket connections + logger.debug(f"User channels {socket_manager.user_channel}") + # friend_websockets: set[WebSocket] = socket_manager.user_channel.get(str(friend_guid)) + + # if not friend_websockets: + # logger.info( + # "Friend doesn't have active connections", extra={"type": "new_chat_created", "friend_guid": friend_guid} + # ) + # return + + current_user_info: dict = { + "guid": current_user.guid, + "first_name": current_user.first_name, + "last_name": current_user.last_name, + "username": current_user.username, + "user_image": current_user.user_image, + } + payload = NewChatCreated( + chat_id=chat.id, + chat_guid=chat.guid, + created_at=chat.created_at, + updated_at=chat.updated_at, + friend=current_user_info, # current user becomes a friend for a user that receives this message + has_new_messages=True, + new_messages_count=1, + ) + + json_payload: str = payload.model_dump_json() + print("Socket's chats", socket_manager.chats) + print("Broadcasting to user", friend_guid, json_payload) + await socket_manager.broadcast_to_user(str(friend_guid), json_payload) + await socket_manager.broadcast_to_user(str(current_user.guid), json_payload) + + async def send_new_chat_created_ws_message(socket_manager: WebSocketManager, current_user: User, chat: Chat): """ Send a new chat created message to friend's websocket connections in the chat.