Permalink
Browse files

minor refactoring of message handlers

  • Loading branch information...
1 parent 3d35d3c commit 772542406963c4714893977c215ad6ec9947f817 @qubird committed Dec 28, 2011
Showing with 15 additions and 21 deletions.
  1. +8 −11 chatrooms/utils/celery_handlers.py
  2. +1 −1 chatrooms/utils/handlers.py
  3. +6 −9 chatrooms/utils/redis_handlers.py
@@ -14,7 +14,7 @@ class CeleryMessageHandler(MessageHandler):
for synchronization
"""
def __init__(self):
- """Initializes redis connections
+ """Initializes celery events and dispatchers
"""
self.app = app_or_default()
self.event = Event(type='chatrooms')
@@ -26,7 +26,7 @@ def handle_received_message(self,
sender, room_id, username, message, date, **kwargs):
"""
1. saves the message
- 2. sends a message to the exchange
+ 2. sends the event
"""
@@ -45,16 +45,12 @@ def handle_received_message(self,
new_message.save()
# 2
- msg_number = new_message.pk
- messages_queue = sender.get_messages_queue(room_id)
- messages_queue.append((msg_number, new_message))
-
self.dispatcher.send(type='chatrooms')
def retrieve_messages(self, chatobj, room_id, latest_msg_id, **kwargs):
"""
- 1. waits for a message on the queue
- 2. returns the list of latest messages
+ 1. waits for "chatrooms" event
+ 2. returns the messages with id greater than latest_msg_id
"""
def handler(*args, **kwargs):
@@ -64,15 +60,16 @@ def handler(*args, **kwargs):
connection=self.app.broker_connection(),
handlers={"chatrooms": handler, })
try:
+ # 1
receiver.capture(limit=1, timeout=20, wakeup=True)
- except:
+ except: # Timeout
pass
-
+ # 2
messages = Message.objects.filter(room=room_id, id__gt=latest_msg_id)
return [(msg.pk, msg) for msg in messages]
def get_latest_message_id(self, chatobj, room_id):
- """Returns id of the latest retrieved message """
+ """Returns id of the latest message received """
latest_msg_id = Message.objects.filter(
room=room_id).aggregate(
max_id=Max('id')).get('max_id')
@@ -87,7 +87,7 @@ def retrieve_messages(self, chatobj, room_id, latest_msg_id, **kwargs):
return chatobj.get_messages_queue(room_id)
def get_latest_message_id(self, chatobj, room_id):
- """Returns id of the latest retrieved message """
+ """Returns id of the latest message received """
latest_msg_id = -1
msgs_queue = chatobj.messages[room_id]
if msgs_queue:
@@ -12,7 +12,7 @@ class RedisMessageHandler(MessageHandler):
for synchronization
"""
def __init__(self):
- """Initializes redis connections
+ """Initializes redis connection
"""
self.client = redis.Redis()
self.pubsub = self.client.pubsub()
@@ -21,7 +21,7 @@ def handle_received_message(self,
sender, room_id, username, message, date, **kwargs):
"""
1. saves the message
- 2. sends a message to the exchange
+ 2. publish a message to the redis client
"""
@@ -40,29 +40,26 @@ def handle_received_message(self,
new_message.save()
# 2
- msg_number = new_message.pk
- messages_queue = sender.get_messages_queue(room_id)
- messages_queue.append((msg_number, new_message))
-
self.client.publish('chatrooms', 'new message')
def retrieve_messages(self, chatobj, room_id, latest_msg_id, **kwargs):
"""
- 1. waits for a message on the queue
+ 1. waits for "new message" on redis
2. returns the list of latest messages
"""
client = redis.Redis(socket_timeout=20)
pubsub = client.pubsub()
pubsub.subscribe('chatrooms')
-
+ # 1
msg = pubsub.listen().next() # TODO: timeout?
pubsub.unsubscribe('chatrooms')
+ # 2
messages = Message.objects.filter(room=room_id, id__gt=latest_msg_id)
return [(msg.pk, msg) for msg in messages]
def get_latest_message_id(self, chatobj, room_id):
- """Returns id of the latest retrieved message """
+ """Returns id of the latest message received """
latest_msg_id = Message.objects.filter(
room=room_id).aggregate(
max_id=Max('id')).get('max_id')

0 comments on commit 7725424

Please sign in to comment.