Skip to content

Commit

Permalink
pubsub unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Dec 10, 2015
1 parent 63f5ed3 commit 71142aa
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 41 deletions.
18 changes: 2 additions & 16 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import pickle

import six
try:
import kombu
except ImportError:
Expand All @@ -10,7 +8,7 @@
from .pubsub_manager import PubSubManager


class KombuManager(PubSubManager):
class KombuManager(PubSubManager): # pragma: no cover
"""Client manager that uses kombu for inter-process messaging.
This class implements a client manager backend for event sharing across
Expand Down Expand Up @@ -53,16 +51,4 @@ def _listen(self):
while True:
message = listen_queue.get(block=True)
message.ack()
data = None
if isinstance(message.payload, six.binary_type):
try:
data = pickle.loads(message.payload)
except:
pass
if data is None:
try:
data = json.loads(message.payload)
except:
pass
if data:
yield data
yield message.payload
40 changes: 31 additions & 9 deletions socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from functools import partial
import uuid

import json
import pickle
import six

from .base_manager import BaseManager


Expand All @@ -18,6 +22,8 @@ class PubSubManager(BaseManager):
:param channel: The channel name on which the server sends and receives
notifications.
"""
name = 'pubsub'

def __init__(self, channel='socketio'):
super(PubSubManager, self).__init__()
self.channel = channel
Expand All @@ -40,6 +46,8 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
"""
namespace = namespace or '/'
if callback is not None:
if room is None:
raise ValueError('Cannot use callback without a room set.')
id = self._generate_ack_id(room, namespace, callback)
callback = (room, namespace, id)
else:
Expand All @@ -59,7 +67,7 @@ def _publish(self, data):
support pub/sub backends.
"""
raise NotImplementedError('This method must be implemented in a '
'subclass.')
'subclass.') # pragma: no cover

def _listen(self):
"""Return the next message published on the Socket.IO channel,
Expand All @@ -69,7 +77,7 @@ def _listen(self):
support pub/sub backends.
"""
raise NotImplementedError('This method must be implemented in a '
'subclass.')
'subclass.') # pragma: no cover

def _handle_emit(self, message):
# Events with callbacks are very tricky to handle across hosts
Expand Down Expand Up @@ -111,10 +119,24 @@ def _handle_close_room(self, message):

def _thread(self):
for message in self._listen():
if 'method' in message:
if message['method'] == 'emit':
self._handle_emit(message)
elif message['method'] == 'callback':
self._handle_callback(message)
elif message['method'] == 'close_room':
self._handle_close_room(message)
data = None
if isinstance(message, dict):
data = message
else:
if isinstance(message, six.binary_type): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'callback':
self._handle_callback(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
18 changes: 2 additions & 16 deletions socketio/redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import pickle

import six
try:
import redis
except ImportError:
Expand All @@ -10,7 +8,7 @@
from .pubsub_manager import PubSubManager


class RedisManager(PubSubManager):
class RedisManager(PubSubManager): # pragma: no cover
"""Redis based client manager.
This class implements a Redis backend for event sharing across multiple
Expand Down Expand Up @@ -48,17 +46,5 @@ def _listen(self):
for message in self.pubsub.listen():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
data = None
if isinstance(message['data'], six.binary_type):
try:
data = pickle.loads(message['data'])
except:
pass
if data is None:
try:
data = json.loads(message['data'])
except:
pass
if data:
yield data
yield message['data']
self.pubsub.unsubscribe(self.channel)
209 changes: 209 additions & 0 deletions tests/test_pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import functools
import unittest

import six
if six.PY3:
from unittest import mock
else:
import mock

from socketio import base_manager
from socketio import pubsub_manager


class TestBaseManager(unittest.TestCase):
def setUp(self):
mock_server = mock.MagicMock()
self.pm = pubsub_manager.PubSubManager()
self.pm._publish = mock.MagicMock()
self.pm.initialize(mock_server)

def test_default_init(self):
self.assertEqual(self.pm.channel, 'socketio')
self.assertEqual(len(self.pm.host_id), 32)
self.pm.server.start_background_task.assert_called_once_with(
self.pm._thread)

def test_custom_init(self):
pubsub = pubsub_manager.PubSubManager(channel='foo')
self.assertEqual(pubsub.channel, 'foo')
self.assertEqual(len(pubsub.host_id), 32)

def test_emit(self):
self.pm.emit('foo', 'bar')
self.pm._publish.assert_called_once_with(
{'method': 'emit', 'event': 'foo', 'data': 'bar',
'namespace': '/', 'room': None, 'skip_sid': None,
'callback': None})

def test_emit_with_namespace(self):
self.pm.emit('foo', 'bar', namespace='/baz')
self.pm._publish.assert_called_once_with(
{'method': 'emit', 'event': 'foo', 'data': 'bar',
'namespace': '/baz', 'room': None, 'skip_sid': None,
'callback': None})

def test_emit_with_room(self):
self.pm.emit('foo', 'bar', room='baz')
self.pm._publish.assert_called_once_with(
{'method': 'emit', 'event': 'foo', 'data': 'bar',
'namespace': '/', 'room': 'baz', 'skip_sid': None,
'callback': None})

def test_emit_with_skip_sid(self):
self.pm.emit('foo', 'bar', skip_sid='baz')
self.pm._publish.assert_called_once_with(
{'method': 'emit', 'event': 'foo', 'data': 'bar',
'namespace': '/', 'room': None, 'skip_sid': 'baz',
'callback': None})

def test_emit_with_callback(self):
with mock.patch.object(self.pm, '_generate_ack_id',
return_value='123'):
self.pm.emit('foo', 'bar', room='baz', callback='cb')
self.pm._publish.assert_called_once_with(
{'method': 'emit', 'event': 'foo', 'data': 'bar',
'namespace': '/', 'room': 'baz', 'skip_sid': None,
'callback': ('baz', '/', '123')})

def test_emit_with_callback_missing_room(self):
with mock.patch.object(self.pm, '_generate_ack_id',
return_value='123'):
self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar',
callback='cb')

def test_close_room(self):
self.pm.close_room('foo')
self.pm._publish.assert_called_once_with(
{'method': 'close_room', 'room': 'foo', 'namespace': '/'})

def test_close_room_with_namespace(self):
self.pm.close_room('foo', '/bar')
self.pm._publish.assert_called_once_with(
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar'})

def test_handle_emit(self):
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar'})
super_emit.assert_called_once_with('foo', 'bar', namespace=None,
room=None, skip_sid=None,
callback=None)

def test_handle_emit_with_namespace(self):
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
'namespace': '/baz'})
super_emit.assert_called_once_with('foo', 'bar', namespace='/baz',
room=None, skip_sid=None,
callback=None)

def test_handle_emiti_with_room(self):
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
'room': 'baz'})
super_emit.assert_called_once_with('foo', 'bar', namespace=None,
room='baz', skip_sid=None,
callback=None)

def test_handle_emit_with_skip_sid(self):
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
'skip_sid': '123'})
super_emit.assert_called_once_with('foo', 'bar', namespace=None,
room=None, skip_sid='123',
callback=None)

def test_handle_emit_with_callback(self):
host_id = self.pm.host_id
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
'namespace': '/baz',
'callback': ('sid', '/baz', 123)})
self.assertEqual(super_emit.call_count, 1)
self.assertEqual(super_emit.call_args[0], ('foo', 'bar'))
self.assertEqual(super_emit.call_args[1]['namespace'], '/baz')
self.assertIsNone(super_emit.call_args[1]['room'])
self.assertIsNone(super_emit.call_args[1]['skip_sid'])
self.assertIsInstance(super_emit.call_args[1]['callback'],
functools.partial)
super_emit.call_args[1]['callback']('one', 2, 'three')
self.pm._publish.assert_called_once_with(
{'method': 'callback', 'host_id': host_id, 'sid': 'sid',
'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')})

def test_handle_callback(self):
host_id = self.pm.host_id
with mock.patch.object(self.pm, 'trigger_callback') as trigger:
self.pm._handle_callback({'method': 'callback',
'host_id': host_id, 'sid': 'sid',
'namespace': '/', 'id': 123,
'args': ('one', 2)})
trigger.assert_called_once_with('sid', '/', 123, ('one', 2))

def test_handle_callback_bad_host_id(self):
with mock.patch.object(self.pm, 'trigger_callback') as trigger:
self.pm._handle_callback({'method': 'callback',
'host_id': 'bad', 'sid': 'sid',
'namespace': '/', 'id': 123,
'args': ('one', 2)})
self.assertEqual(trigger.call_count, 0)

def test_handle_callback_missing_args(self):
host_id = self.pm.host_id
with mock.patch.object(self.pm, 'trigger_callback') as trigger:
self.pm._handle_callback({'method': 'callback',
'host_id': host_id, 'sid': 'sid',
'namespace': '/', 'id': 123})
self.pm._handle_callback({'method': 'callback',
'host_id': host_id, 'sid': 'sid',
'namespace': '/'})
self.pm._handle_callback({'method': 'callback',
'host_id': host_id, 'sid': 'sid'})
self.pm._handle_callback({'method': 'callback',
'host_id': host_id})
self.assertEqual(trigger.call_count, 0)

def test_handle_close_room(self):
with mock.patch.object(base_manager.BaseManager, 'close_room') \
as super_close_room:
self.pm._handle_close_room({'method': 'close_room',
'room': 'foo'})
super_close_room.assert_called_once_with(room='foo',
namespace=None)

def test_handle_close_room_with_namespace(self):
with mock.patch.object(base_manager.BaseManager, 'close_room') \
as super_close_room:
self.pm._handle_close_room({'method': 'close_room',
'room': 'foo', 'namespace': '/bar'})
super_close_room.assert_called_once_with(room='foo',
namespace='/bar')

def test_background_thread(self):
self.pm._handle_emit = mock.MagicMock()
self.pm._handle_callback = mock.MagicMock()
self.pm._handle_close_room = mock.MagicMock()

def messages():
import pickle
yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}'
yield {'method': 'bogus'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
yield b'bad pickled'
raise KeyboardInterrupt

self.pm._listen = mock.MagicMock(side_effect=messages)
try:
self.pm._thread()
except KeyboardInterrupt:
pass

self.pm._handle_emit.assert_called_once_with(
{'method': 'emit', 'value': 'foo'})
self.pm._handle_callback.assert_called_once_with(
{'method': 'callback', 'value': 'bar'})
self.pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'})

0 comments on commit 71142aa

Please sign in to comment.