Skip to content

Commit

Permalink
specify python version in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
notarious2 committed Mar 22, 2024
1 parent a770a1b commit fad6c22
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 30 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.11.3"
- name: Install dependencies
run: |
poetry env use "3.11"
poetry env use "3.11.3"
poetry export --only lint --output lint-requirements.txt
pip install -r lint-requirements.txt
- name: Run Ruff
Expand Down Expand Up @@ -49,13 +49,13 @@ jobs:
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.11.3"

- uses: actions/setup-python@v4
with:
python-version: 3.11
python-version: 3.11.3
- run: |
poetry env use "3.11"
poetry env use "3.11.3"
poetry install
poetry run pytest -x -n auto --dist loadfile
env:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- db
networks:
- chat-net
command: uvicorn src.main:app --host=0.0.0.0 --port=8001 --reload
command: gunicorn -w 2 -k uvicorn.workers.UvicornWorker src.main:app -b 0.0.0.0:8001

db:
image: postgres:15-alpine
Expand Down
2 changes: 1 addition & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class GlobalSettings(BaseSettings):
ADMIN_SECRET_KEY: str = "Hv9LGqARc473ceBUYDw1FR0QaXOA3Ky4"

# redis for caching
REDIS_CACHE_ENABLED: bool = True
REDIS_CACHE_ENABLED: bool = False
REDIS_HOST: str = "chat-redis"
REDIS_PORT: str | int = 6379
REDIS_PASSWORD: str | None = None
Expand Down
39 changes: 38 additions & 1 deletion src/managers/websocket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self):
self.chats: dict = {} # stores user WebSocket connections by chat {"chat_guid": {ws1, ws2}, ...}
self.pubsub_client = RedisPubSubManager()
self.user_guid_to_websocket: dict = {} # stores user_guid: {ws1, ws2} combinations
self.user_channel: dict = {}

def handler(self, message_type):
def decorator(func):
Expand Down Expand Up @@ -43,11 +44,33 @@ async def broadcast_to_chat(self, chat_guid: str, message: str | dict) -> None:
message = json.dumps(message)
await self.pubsub_client.publish(chat_guid, message)

async def create_user_channel(self, user_guid: str, websocket: WebSocket):
if user_guid in self.user_channel:
self.user_channel[user_guid].add(websocket)
else:
self.user_channel[user_guid] = {websocket}
await self.pubsub_client.connect()
pubsub_subscriber = await self.pubsub_client.subscribe(user_guid)
asyncio.create_task(self._pubsub_data_reader_for_user(pubsub_subscriber))
print("Guys", self.user_channel)

async def broadcast_to_user(self, user_guid: str, message: str | dict):
if isinstance(message, dict):
message = json.dumps(message)
await self.pubsub_client.publish(user_guid, message)

async def remove_websocket_from_user_channel(self, user_guid: str, websocket: WebSocket) -> None:
self.user_channel[user_guid].remove(websocket)
if len(self.user_channel[user_guid]) == 0:
del self.user_channel[user_guid]
logger.info(f"Removing user from PubSub channel {user_guid}")
await self.pubsub_client.unsubscribe(user_guid)

async def remove_user_from_chat(self, chat_guid: str, websocket: WebSocket) -> None:
self.chats[chat_guid].remove(websocket)
if len(self.chats[chat_guid]) == 0:
del self.chats[chat_guid]
logger.info("Removing user from PubSub channel {chat_guid}")
logger.info(f"Removing user from PubSub channel {chat_guid}")
await self.pubsub_client.unsubscribe(chat_guid)

async def remove_user_guid_to_websocket(self, user_guid: str, websocket: WebSocket):
Expand All @@ -69,5 +92,19 @@ async def _pubsub_data_reader(self, pubsub_subscriber):
except Exception as exc:
logger.exception(f"Exception occurred: {exc}")

async def _pubsub_data_reader_for_user(self, pubsub_subscriber):
try:
while True:
message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
if message is not None:
chat_guid = message["channel"].decode("utf-8")
sockets = self.user_channel.get(chat_guid)
if sockets:
for socket in sockets:
data = message["data"].decode("utf-8")
await socket.send_text(data)
except Exception as exc:
logger.exception(f"Exception occurred: {exc}")

async def send_error(self, message: str, websocket: WebSocket):
await websocket.send_json({"status": "error", "message": message})
44 changes: 24 additions & 20 deletions src/websocket/handlers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import asyncio
import logging
from datetime import datetime

import redis.asyncio as aioredis
from fastapi import WebSocket
from fastapi.encoders import jsonable_encoder
from sqlalchemy.ext.asyncio import AsyncSession

from src.managers.websocket_manager import WebSocketManager
Expand All @@ -23,7 +21,7 @@
get_message_by_guid,
mark_last_read_message,
mark_user_as_online,
send_new_chat_created_ws_message,
notify_friend_about_new_chat,
)

logger = logging.getLogger(__name__)
Expand All @@ -50,16 +48,18 @@ async def new_message_handler(
message_schema = ReceiveMessageSchema(**incoming_message)
chat_guid: str = str(message_schema.chat_guid)

notify_friend_about_new_chat: bool = False
# notify_friend_about_new_chat: bool = False
# newly created chat
if not chats or chat_guid not in chats:
print("YES", chats)
chat_id: int | None = await get_chat_id_by_guid(db_session, chat_guid=chat_guid)
if chat_id:
print("Chat ID", chat_id)
# this action modifies chats variable in websocket view
chats[chat_guid] = chat_id
await socket_manager.add_user_to_chat(chat_guid, websocket)
# must notify friend that new chat has been created
notify_friend_about_new_chat = True
# notify_friend_about_new_chat = True

else:
await socket_manager.send_error("Chat has not been added", websocket)
Expand Down Expand Up @@ -113,9 +113,11 @@ async def new_message_handler(

await socket_manager.broadcast_to_chat(chat_guid, outgoing_message)

if notify_friend_about_new_chat:
logger.info("Notifying friend about newly created chat")
await send_new_chat_created_ws_message(socket_manager=socket_manager, current_user=current_user, chat=chat)
await notify_friend_about_new_chat(socket_manager=socket_manager, current_user=current_user, chat=chat)

# if notify_friend_about_new_chat:
# logger.info("Notifying friend about newly created chat")
# await send_new_chat_created_ws_message(socket_manager=socket_manager, current_user=current_user, chat=chat)


@socket_manager.handler("message_read")
Expand Down Expand Up @@ -248,22 +250,24 @@ async def chat_deleted_handler(
# get all websocket connections that belong to this chat (except for ws that sent this messsage)
# and send notification that chat has been removed

target_websockets: set[WebSocket] = socket_manager.chats.get(chat_guid)

outgoing_message = {
"type": "chat_deleted",
"user_guid": str(current_user.guid),
"user_name": current_user.first_name,
"chat_guid": chat_guid,
}

if target_websockets:
# Send the notification message to the target user concurrently
# used to notify frontend
await asyncio.gather(
*[
socket.send_json(jsonable_encoder(outgoing_message))
for socket in target_websockets
if socket != websocket
]
)
await socket_manager.broadcast_to_chat(chat_guid, outgoing_message)

# target_websockets: set[WebSocket] = socket_manager.chats.get(chat_guid)

# if target_websockets:
# # Send the notification message to the target user concurrently
# # used to notify frontend
# await asyncio.gather(
# *[
# socket.send_json(jsonable_encoder(outgoing_message))
# for socket in target_websockets
# if socket != websocket
# ]
# )
7 changes: 6 additions & 1 deletion src/websocket/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ async def websocket_endpoint(
else:
chats = dict()

# add user socket connection to user channel
await socket_manager.create_user_channel(user_guid=str(current_user.guid), websocket=websocket)
logger.debug(f"User channels {socket_manager.user_channel}")

# task for sending status messages, not dependent on cache_enabled
user_status_task = asyncio.create_task(check_user_statuses(cache, socket_manager, current_user, chats))

Expand Down Expand Up @@ -96,8 +100,9 @@ async def websocket_endpoint(
await socket_manager.send_error("You have sent too many requests", websocket)

except WebSocketDisconnect:
logging.info("Websocket is disconnected")
logger.info("Websocket is disconnected")
# unsubscribe user websocket connection from all chats
await socket_manager.remove_websocket_from_user_channel(user_guid=str(current_user.guid), websocket=websocket)
if chats:
for chat_guid in chats:
await socket_manager.remove_user_from_chat(chat_guid, websocket)
Expand Down
2 changes: 1 addition & 1 deletion src/websocket/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UserTypingSchema(BaseModel):


class NewChatCreated(BaseModel):
type: str = "new_chat_created"
type: str = "new_chat_created" # handled by frontend only
chat_id: int # need to pass for guid/id mapping [chats variable]
chat_guid: UUID4
created_at: datetime
Expand Down
44 changes: 44 additions & 0 deletions src/websocket/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,50 @@ async def get_chat_id_by_guid(db_session: AsyncSession, *, chat_guid: UUID) -> i
return chat_id


async def notify_friend_about_new_chat(socket_manager: WebSocketManager, current_user: User, chat: Chat):
# get friend guid

friend_guid: UUID | None = next((user.guid for user in chat.users if not user == current_user), None)
if not friend_guid:
logger.error("Friend guid not found", extra={"type": "new_chat_created", "friend_guid": friend_guid})
return
logger.debug(f"BROADCASTING TO FRIEND {friend_guid}")
await socket_manager.broadcast_to_user(str(friend_guid), {"Message": "WTF"})

# check if friend has active websocket connections
logger.debug(f"User channels {socket_manager.user_channel}")
# friend_websockets: set[WebSocket] = socket_manager.user_channel.get(str(friend_guid))

# if not friend_websockets:
# logger.info(
# "Friend doesn't have active connections", extra={"type": "new_chat_created", "friend_guid": friend_guid}
# )
# return

current_user_info: dict = {
"guid": current_user.guid,
"first_name": current_user.first_name,
"last_name": current_user.last_name,
"username": current_user.username,
"user_image": current_user.user_image,
}
payload = NewChatCreated(
chat_id=chat.id,
chat_guid=chat.guid,
created_at=chat.created_at,
updated_at=chat.updated_at,
friend=current_user_info, # current user becomes a friend for a user that receives this message
has_new_messages=True,
new_messages_count=1,
)

json_payload: str = payload.model_dump_json()
print("Socket's chats", socket_manager.chats)
print("Broadcasting to user", friend_guid, json_payload)
await socket_manager.broadcast_to_user(str(friend_guid), json_payload)
await socket_manager.broadcast_to_user(str(current_user.guid), json_payload)


async def send_new_chat_created_ws_message(socket_manager: WebSocketManager, current_user: User, chat: Chat):
"""
Send a new chat created message to friend's websocket connections in the chat.
Expand Down

0 comments on commit fad6c22

Please sign in to comment.