Skip to content

Commit

Permalink
Logging improvements for write-only connections (#197)
Browse files Browse the repository at this point in the history
* Make logger a property of BaseManager, configurable separately from server for write-only situations.

* Shorten line lengths for recent changes.
  • Loading branch information
truetech authored and miguelgrinberg committed Oct 9, 2018
1 parent f2d28ad commit da7cb86
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 23 deletions.
2 changes: 1 addition & 1 deletion socketio/asyncio_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def trigger_callback(self, sid, namespace, id, data):
callback = self.callbacks[sid][namespace][id]
except KeyError:
# if we get an unknown callback we just ignore it
self.server.logger.warning('Unknown callback received, ignoring.')
self._get_logger().warning('Unknown callback received, ignoring.')
else:
del self.callbacks[sid][namespace][id]
if callback is not None:
Expand Down
5 changes: 3 additions & 2 deletions socketio/asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ class AsyncPubSubManager(AsyncManager):
"""
name = 'asyncpubsub'

def __init__(self, channel='socketio', write_only=False):
def __init__(self, channel='socketio', write_only=False, logger=None):
super().__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger

def initialize(self):
super().initialize()
if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.')
self._get_logger().info(self.name + ' backend initialized.')

async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback=None, **kwargs):
Expand Down
18 changes: 9 additions & 9 deletions socketio/asyncio_redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import logging
import pickle
from urllib.parse import urlparse

Expand All @@ -10,8 +9,6 @@

from .asyncio_pubsub_manager import AsyncPubSubManager

logger = logging.getLogger('socketio')


def _parse_redis_url(url):
p = urlparse(url)
Expand Down Expand Up @@ -52,15 +49,15 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
name = 'aioredis'

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False):
write_only=False, logger=None):
if aioredis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install aioredis" in your '
'virtualenv).')
self.host, self.port, self.password, self.db = _parse_redis_url(url)
self.pub = None
self.sub = None
super().__init__(channel=channel, write_only=write_only)
super().__init__(channel=channel, write_only=write_only, logger=logger)

async def _publish(self, data):
retry = True
Expand All @@ -74,11 +71,13 @@ async def _publish(self, data):
pickle.dumps(data))
except (aioredis.RedisError, OSError):
if retry:
logger.error('Cannot publish to redis... retrying')
self._get_logger().error('Cannot publish to redis... '
'retrying')
self.pub = None
retry = False
else:
logger.error('Cannot publish to redis... giving up')
self._get_logger().error('Cannot publish to redis... '
'giving up')
break

async def _listen(self):
Expand All @@ -92,8 +91,9 @@ async def _listen(self):
self.ch = (await self.sub.subscribe(self.channel))[0]
return await self.ch.get()
except (aioredis.RedisError, OSError):
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep))
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep))
self.sub = None
await asyncio.sleep(retry_sleep)
retry_sleep *= 2
Expand Down
19 changes: 18 additions & 1 deletion socketio/base_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import itertools
import logging

import six

default_logger = logging.getLogger('socketio')


class BaseManager(object):
"""Manage client connections.
Expand All @@ -13,6 +16,7 @@ class BaseManager(object):
subclasses.
"""
def __init__(self):
self.logger = None
self.server = None
self.rooms = {}
self.callbacks = {}
Expand Down Expand Up @@ -141,7 +145,7 @@ def trigger_callback(self, sid, namespace, id, data):
callback = self.callbacks[sid][namespace][id]
except KeyError:
# if we get an unknown callback we just ignore it
self.server.logger.warning('Unknown callback received, ignoring.')
self._get_logger().warning('Unknown callback received, ignoring.')
else:
del self.callbacks[sid][namespace][id]
if callback is not None:
Expand All @@ -157,3 +161,16 @@ def _generate_ack_id(self, sid, namespace, callback):
id = six.next(self.callbacks[sid][namespace][0])
self.callbacks[sid][namespace][id] = callback
return id

def _get_logger(self):
"""Get the appropriate logger
Prevents uninitialized servers in write-only mode from failing.
"""

if self.logger:
return self.logger
elif self.server:
return self.server.logger
else:
return default_logger
10 changes: 6 additions & 4 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ class KombuManager(PubSubManager): # pragma: no cover
name = 'kombu'

def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False):
channel='socketio', write_only=False, logger=None):
if kombu is None:
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
'virtualenv).')
super(KombuManager, self).__init__(channel=channel)
super(KombuManager, self).__init__(channel=channel,
write_only=write_only,
logger=logger)
self.url = url
self.producer = self._producer()

Expand Down Expand Up @@ -78,7 +80,7 @@ def _producer(self):
return self._connection().Producer(exchange=self._exchange())

def __error_callback(self, exception, interval):
self.server.logger.exception('Sleeping {}s'.format(interval))
self._get_logger().exception('Sleeping {}s'.format(interval))

def _publish(self, data):
connection = self._connection()
Expand All @@ -99,5 +101,5 @@ def _listen(self):
message.ack()
yield message.payload
except connection.connection_errors:
self.server.logger.exception("Connection error "
self._get_logger().exception("Connection error "
"while reading from queue")
5 changes: 3 additions & 2 deletions socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ class PubSubManager(BaseManager):
"""
name = 'pubsub'

def __init__(self, channel='socketio', write_only=False):
def __init__(self, channel='socketio', write_only=False, logger=None):
super(PubSubManager, self).__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger

def initialize(self):
super(PubSubManager, self).initialize()
if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.')
self._get_logger().info(self.name + ' backend initialized.')

def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback=None, **kwargs):
Expand Down
5 changes: 3 additions & 2 deletions socketio/redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ class RedisManager(PubSubManager): # pragma: no cover
name = 'redis'

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False):
write_only=False, logger=None):
if redis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
self.redis_url = url
self._redis_connect()
super(RedisManager, self).__init__(channel=channel,
write_only=write_only)
write_only=write_only,
logger=logger)

def initialize(self):
super(RedisManager, self).initialize()
Expand Down
6 changes: 4 additions & 2 deletions socketio/zmq_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class ZmqManager(PubSubManager): # pragma: no cover

def __init__(self, url='zmq+tcp://localhost:5555+5556',
channel='socketio',
write_only=False):
write_only=False,
logger=None):
if zmq is None:
raise RuntimeError('zmq package is not installed '
'(Run "pip install pyzmq" in your '
Expand All @@ -76,7 +77,8 @@ def __init__(self, url='zmq+tcp://localhost:5555+5556',
self.sub = sub
self.channel = channel
super(ZmqManager, self).__init__(channel=channel,
write_only=write_only)
write_only=write_only,
logger=logger)

def _publish(self, data):
pickled_data = pickle.dumps(
Expand Down
17 changes: 17 additions & 0 deletions tests/test_pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import unittest
import logging

import six
if six.PY3:
Expand Down Expand Up @@ -39,6 +40,22 @@ def test_write_only_init(self):
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm.server.start_background_task.call_count, 0)

def test_write_only_default_logger(self):
pm = pubsub_manager.PubSubManager(write_only=True)
pm.initialize()
self.assertEqual(pm.channel, 'socketio')
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm._get_logger(), logging.getLogger('socketio'))

def test_write_only_with_provided_logger(self):
test_logger = logging.getLogger('new_logger')
pm = pubsub_manager.PubSubManager(write_only=True,
logger=test_logger)
pm.initialize()
self.assertEqual(pm.channel, 'socketio')
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm._get_logger(), test_logger)

def test_emit(self):
self.pm.emit('foo', 'bar')
self.pm._publish.assert_called_once_with(
Expand Down

0 comments on commit da7cb86

Please sign in to comment.