From 71142aa2db1434ab799edebeaa97020c6ec21089 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Mon, 7 Dec 2015 00:06:10 -0800 Subject: [PATCH] pubsub unit tests --- socketio/kombu_manager.py | 18 +-- socketio/pubsub_manager.py | 40 +++++-- socketio/redis_manager.py | 18 +-- tests/test_pubsub_manager.py | 209 +++++++++++++++++++++++++++++++++++ 4 files changed, 244 insertions(+), 41 deletions(-) create mode 100644 tests/test_pubsub_manager.py diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index c5e7298d..05d256d0 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -1,7 +1,5 @@ -import json import pickle -import six try: import kombu except ImportError: @@ -10,7 +8,7 @@ from .pubsub_manager import PubSubManager -class KombuManager(PubSubManager): +class KombuManager(PubSubManager): # pragma: no cover """Client manager that uses kombu for inter-process messaging. This class implements a client manager backend for event sharing across @@ -53,16 +51,4 @@ def _listen(self): while True: message = listen_queue.get(block=True) message.ack() - data = None - if isinstance(message.payload, six.binary_type): - try: - data = pickle.loads(message.payload) - except: - pass - if data is None: - try: - data = json.loads(message.payload) - except: - pass - if data: - yield data + yield message.payload diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index bb21e0e8..48ca09f4 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -1,6 +1,10 @@ from functools import partial import uuid +import json +import pickle +import six + from .base_manager import BaseManager @@ -18,6 +22,8 @@ class PubSubManager(BaseManager): :param channel: The channel name on which the server sends and receives notifications. """ + name = 'pubsub' + def __init__(self, channel='socketio'): super(PubSubManager, self).__init__() self.channel = channel @@ -40,6 +46,8 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None, """ namespace = namespace or '/' if callback is not None: + if room is None: + raise ValueError('Cannot use callback without a room set.') id = self._generate_ack_id(room, namespace, callback) callback = (room, namespace, id) else: @@ -59,7 +67,7 @@ def _publish(self, data): support pub/sub backends. """ raise NotImplementedError('This method must be implemented in a ' - 'subclass.') + 'subclass.') # pragma: no cover def _listen(self): """Return the next message published on the Socket.IO channel, @@ -69,7 +77,7 @@ def _listen(self): support pub/sub backends. """ raise NotImplementedError('This method must be implemented in a ' - 'subclass.') + 'subclass.') # pragma: no cover def _handle_emit(self, message): # Events with callbacks are very tricky to handle across hosts @@ -111,10 +119,24 @@ def _handle_close_room(self, message): def _thread(self): for message in self._listen(): - if 'method' in message: - if message['method'] == 'emit': - self._handle_emit(message) - elif message['method'] == 'callback': - self._handle_callback(message) - elif message['method'] == 'close_room': - self._handle_close_room(message) + data = None + if isinstance(message, dict): + data = message + else: + if isinstance(message, six.binary_type): # pragma: no cover + try: + data = pickle.loads(message) + except: + pass + if data is None: + try: + data = json.loads(message) + except: + pass + if data and 'method' in data: + if data['method'] == 'emit': + self._handle_emit(data) + elif data['method'] == 'callback': + self._handle_callback(data) + elif data['method'] == 'close_room': + self._handle_close_room(data) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 9004fae6..900baaf9 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -1,7 +1,5 @@ -import json import pickle -import six try: import redis except ImportError: @@ -10,7 +8,7 @@ from .pubsub_manager import PubSubManager -class RedisManager(PubSubManager): +class RedisManager(PubSubManager): # pragma: no cover """Redis based client manager. This class implements a Redis backend for event sharing across multiple @@ -48,17 +46,5 @@ def _listen(self): for message in self.pubsub.listen(): if message['channel'] == channel and \ message['type'] == 'message' and 'data' in message: - data = None - if isinstance(message['data'], six.binary_type): - try: - data = pickle.loads(message['data']) - except: - pass - if data is None: - try: - data = json.loads(message['data']) - except: - pass - if data: - yield data + yield message['data'] self.pubsub.unsubscribe(self.channel) diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py new file mode 100644 index 00000000..9fab1d73 --- /dev/null +++ b/tests/test_pubsub_manager.py @@ -0,0 +1,209 @@ +import functools +import unittest + +import six +if six.PY3: + from unittest import mock +else: + import mock + +from socketio import base_manager +from socketio import pubsub_manager + + +class TestBaseManager(unittest.TestCase): + def setUp(self): + mock_server = mock.MagicMock() + self.pm = pubsub_manager.PubSubManager() + self.pm._publish = mock.MagicMock() + self.pm.initialize(mock_server) + + def test_default_init(self): + self.assertEqual(self.pm.channel, 'socketio') + self.assertEqual(len(self.pm.host_id), 32) + self.pm.server.start_background_task.assert_called_once_with( + self.pm._thread) + + def test_custom_init(self): + pubsub = pubsub_manager.PubSubManager(channel='foo') + self.assertEqual(pubsub.channel, 'foo') + self.assertEqual(len(pubsub.host_id), 32) + + def test_emit(self): + self.pm.emit('foo', 'bar') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': None, 'skip_sid': None, + 'callback': None}) + + def test_emit_with_namespace(self): + self.pm.emit('foo', 'bar', namespace='/baz') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/baz', 'room': None, 'skip_sid': None, + 'callback': None}) + + def test_emit_with_room(self): + self.pm.emit('foo', 'bar', room='baz') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': 'baz', 'skip_sid': None, + 'callback': None}) + + def test_emit_with_skip_sid(self): + self.pm.emit('foo', 'bar', skip_sid='baz') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': None, 'skip_sid': 'baz', + 'callback': None}) + + def test_emit_with_callback(self): + with mock.patch.object(self.pm, '_generate_ack_id', + return_value='123'): + self.pm.emit('foo', 'bar', room='baz', callback='cb') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': 'baz', 'skip_sid': None, + 'callback': ('baz', '/', '123')}) + + def test_emit_with_callback_missing_room(self): + with mock.patch.object(self.pm, '_generate_ack_id', + return_value='123'): + self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar', + callback='cb') + + def test_close_room(self): + self.pm.close_room('foo') + self.pm._publish.assert_called_once_with( + {'method': 'close_room', 'room': 'foo', 'namespace': '/'}) + + def test_close_room_with_namespace(self): + self.pm.close_room('foo', '/bar') + self.pm._publish.assert_called_once_with( + {'method': 'close_room', 'room': 'foo', 'namespace': '/bar'}) + + def test_handle_emit(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar'}) + super_emit.assert_called_once_with('foo', 'bar', namespace=None, + room=None, skip_sid=None, + callback=None) + + def test_handle_emit_with_namespace(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'namespace': '/baz'}) + super_emit.assert_called_once_with('foo', 'bar', namespace='/baz', + room=None, skip_sid=None, + callback=None) + + def test_handle_emiti_with_room(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'room': 'baz'}) + super_emit.assert_called_once_with('foo', 'bar', namespace=None, + room='baz', skip_sid=None, + callback=None) + + def test_handle_emit_with_skip_sid(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'skip_sid': '123'}) + super_emit.assert_called_once_with('foo', 'bar', namespace=None, + room=None, skip_sid='123', + callback=None) + + def test_handle_emit_with_callback(self): + host_id = self.pm.host_id + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'namespace': '/baz', + 'callback': ('sid', '/baz', 123)}) + self.assertEqual(super_emit.call_count, 1) + self.assertEqual(super_emit.call_args[0], ('foo', 'bar')) + self.assertEqual(super_emit.call_args[1]['namespace'], '/baz') + self.assertIsNone(super_emit.call_args[1]['room']) + self.assertIsNone(super_emit.call_args[1]['skip_sid']) + self.assertIsInstance(super_emit.call_args[1]['callback'], + functools.partial) + super_emit.call_args[1]['callback']('one', 2, 'three') + self.pm._publish.assert_called_once_with( + {'method': 'callback', 'host_id': host_id, 'sid': 'sid', + 'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')}) + + def test_handle_callback(self): + host_id = self.pm.host_id + with mock.patch.object(self.pm, 'trigger_callback') as trigger: + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid', + 'namespace': '/', 'id': 123, + 'args': ('one', 2)}) + trigger.assert_called_once_with('sid', '/', 123, ('one', 2)) + + def test_handle_callback_bad_host_id(self): + with mock.patch.object(self.pm, 'trigger_callback') as trigger: + self.pm._handle_callback({'method': 'callback', + 'host_id': 'bad', 'sid': 'sid', + 'namespace': '/', 'id': 123, + 'args': ('one', 2)}) + self.assertEqual(trigger.call_count, 0) + + def test_handle_callback_missing_args(self): + host_id = self.pm.host_id + with mock.patch.object(self.pm, 'trigger_callback') as trigger: + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid', + 'namespace': '/', 'id': 123}) + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid', + 'namespace': '/'}) + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid'}) + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id}) + self.assertEqual(trigger.call_count, 0) + + def test_handle_close_room(self): + with mock.patch.object(base_manager.BaseManager, 'close_room') \ + as super_close_room: + self.pm._handle_close_room({'method': 'close_room', + 'room': 'foo'}) + super_close_room.assert_called_once_with(room='foo', + namespace=None) + + def test_handle_close_room_with_namespace(self): + with mock.patch.object(base_manager.BaseManager, 'close_room') \ + as super_close_room: + self.pm._handle_close_room({'method': 'close_room', + 'room': 'foo', 'namespace': '/bar'}) + super_close_room.assert_called_once_with(room='foo', + namespace='/bar') + + def test_background_thread(self): + self.pm._handle_emit = mock.MagicMock() + self.pm._handle_callback = mock.MagicMock() + self.pm._handle_close_room = mock.MagicMock() + + def messages(): + import pickle + yield {'method': 'emit', 'value': 'foo'} + yield {'missing': 'method'} + yield '{"method": "callback", "value": "bar"}' + yield {'method': 'bogus'} + yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) + yield 'bad json' + yield b'bad pickled' + raise KeyboardInterrupt + + self.pm._listen = mock.MagicMock(side_effect=messages) + try: + self.pm._thread() + except KeyboardInterrupt: + pass + + self.pm._handle_emit.assert_called_once_with( + {'method': 'emit', 'value': 'foo'}) + self.pm._handle_callback.assert_called_once_with( + {'method': 'callback', 'value': 'bar'}) + self.pm._handle_close_room.assert_called_once_with( + {'method': 'close_room', 'value': 'baz'})