Skip to content

Commit

Permalink
Catch and log errors in pubsub listening thread (Fixes #889)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Mar 14, 2022
1 parent fb96485 commit f2ae136
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 16 deletions.
22 changes: 14 additions & 8 deletions src/socketio/asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,20 @@ async def _thread(self):
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
try:
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
except asyncio.CancelledError:
raise # let the outer try/except handle it
except:
self.server.logger.exception(
'Unknown error in pubsub listening task')
except asyncio.CancelledError: # pragma: no cover
break
except: # pragma: no cover
Expand Down
20 changes: 12 additions & 8 deletions src/socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@ def _thread(self):
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'callback':
self._handle_callback(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
try:
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'callback':
self._handle_callback(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
except:
self.server.logger.exception(
'Unknown error in pubsub listening thread')
18 changes: 18 additions & 0 deletions tests/asyncio/test_asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,21 @@ async def messages():
self.pm._handle_close_room.mock.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'}
)

def test_background_thread_exception(self):
self.pm._handle_emit = AsyncMock(side_effect=[ValueError(),
asyncio.CancelledError])

async def messages():
yield {'method': 'emit', 'value': 'foo'}
yield {'method': 'emit', 'value': 'bar'}

self.pm._listen = messages
_run(self.pm._thread())

self.pm._handle_emit.mock.assert_any_call(
{'method': 'emit', 'value': 'foo'}
)
self.pm._handle_emit.mock.assert_called_with(
{'method': 'emit', 'value': 'bar'}
)
20 changes: 20 additions & 0 deletions tests/common/test_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,23 @@ def messages():
self.pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'}
)

def test_background_thread_exception(self):
self.pm._handle_emit = mock.MagicMock(side_effect=[ValueError(), None])

def messages():
yield {'method': 'emit', 'value': 'foo'}
yield {'method': 'emit', 'value': 'bar'}

self.pm._listen = mock.MagicMock(side_effect=messages)
try:
self.pm._thread()
except StopIteration:
pass

self.pm._handle_emit.assert_any_call(
{'method': 'emit', 'value': 'foo'}
)
self.pm._handle_emit.assert_called_with(
{'method': 'emit', 'value': 'bar'}
)

0 comments on commit f2ae136

Please sign in to comment.