Skip to content

Commit

Permalink
implement manage ws connections, publish to wroker process topic
Browse files Browse the repository at this point in the history
  • Loading branch information
kolyanu4 committed Dec 17, 2016
1 parent 5e48d44 commit c075b7d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
36 changes: 33 additions & 3 deletions django_aiohttp_websockets/websockets/core/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import logging
import random

import aioredis
from aiohttp import web, WSCloseCode
Expand All @@ -19,6 +20,7 @@


class WSApplication(web.Application):
WS_MESSAGE_REQUIRED_KEYS = ['uuid', ]

def __init__(self, **kwargs):
super(WSApplication, self).__init__(**kwargs)
Expand All @@ -32,6 +34,7 @@ def __init__(self, **kwargs):
async def _setup(self):
self.router.add_get('/ws', views.WebSocketView)
self.redis_subscriber = await aioredis.create_redis((settings.REDIS_HOST, settings.REDIS_PORT), loop=self.loop)
self.redis_publisher = await aioredis.create_redis((settings.REDIS_HOST, settings.REDIS_PORT), loop=self.loop)
self.tasks.append(self.loop.create_task(self.subscribe_to_channel(settings.WORKER_RESPONSE_TOPIC)))

async def _on_shutdown_handler(self, app):
Expand All @@ -42,9 +45,10 @@ async def _on_shutdown_handler(self, app):
for ws in self.websockets:
await ws.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')

if self.redis_subscriber and not self.redis_subscriber.closed:
self.redis_subscriber.close()
await self.redis_subscriber.wait_closed()
for redis_conn in [self.redis_subscriber, self.redis_publisher]:
if redis_conn and not redis_conn.closed:
redis_conn.close()
await redis_conn.wait_closed()

async def subscribe_to_channel(self, topic):
self.logger.info('Subscribe to channel: %s', topic)
Expand All @@ -63,3 +67,29 @@ async def subscribe_to_channel(self, topic):
except asyncio.CancelledError:
self.logger.error('CancelledError exception received. Unsubscribe from channel: %s', topic)
await self.redis_subscriber.unsubscribe(topic)

def handle_ws_connect(self, ws, view):
self.websockets[ws] = {
'view': view,
'messages_ids': [],
'session_data': {
'user_pk': None
}
}
self.logger.debug('[%s] Websocket was added to websocket list', id(ws))

def handle_ws_disconnect(self, ws):
self.websockets.pop(ws, None)
self.logger.debug('[%s] Websocket was removed from websockets list', id(ws))

async def publish_message_to_worker(self, ws, msg):
if not all(msg.get(key) for key in self.WS_MESSAGE_REQUIRED_KEYS):
raise Exception('Missing required keys')

msg_id = msg['uuid']
publish_topic = random.choice(settings.WORKER_PROCESS_TOPICS)

msg['session_data'] = self.websockets[ws]['session_data']
self.websockets[ws]['messages_ids'].append(msg_id)
self.logger.debug('[%s] Publish message with id \'%s\' to topic \'%s\'', id(ws), msg_id, publish_topic)
await self.redis_publisher.publish_json(publish_topic, msg)
37 changes: 24 additions & 13 deletions django_aiohttp_websockets/websockets/core/views.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
from aiohttp import web, WSMsgType
import json

from aiohttp import web, WSMsgType, WSCloseCode


class WebSocketView(web.View):

def __init__(self, *args, **kwargs):
super(WebSocketView, self).__init__(*args, **kwargs)
self.app = self.request.app
self.logger = self.app.logger

async def get(self):

ws = web.WebSocketResponse()
await ws.prepare(self.request)

async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
ws.send_str(msg.data + '/answer')
elif msg.type == WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())

print('websocket connection closed')
ws_id = id(ws)
self.logger.debug('[%s] New websocket connection', ws_id)
self.app.handle_ws_connect(ws, self)

async for msg_raw in ws:
if msg_raw.tp == WSMsgType.TEXT:
try:
msg = json.loads(msg_raw.data)
self.logger.debug('[%s] Publish message %s to redis', ws_id, msg)
await self.app.publish_message_to_worker(ws, msg)
except Exception as e:
self.logger.error('[%s] Invalid message format. Exception: %s', ws_id, e)
await ws.close(code=WSCloseCode.UNSUPPORTED_DATA, message=str(e))
break

elif msg_raw.tp == WSMsgType.ERROR:
self.logger.error('[%s] ERROR WS connection closed with exception: %s', ws_id, ws.exception())

self.logger.debug('[%s] Websocket connection closed', ws_id)
self.app.handle_ws_disconnect(ws)
return ws

0 comments on commit c075b7d

Please sign in to comment.