Skip to content

Commit

Permalink
Pass additional options to Redis and Kombu managers (Fixes #307)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Aug 4, 2019
1 parent f36fa88 commit 7dbc470
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
31 changes: 24 additions & 7 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ class KombuManager(PubSubManager): # pragma: no cover
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param connection_options: additional keyword arguments to be passed to
``kombu.Connection()``.
:param exchange_options: additional keyword arguments to be passed to
``kombu.Exchange()``.
:param queue_options: additional keyword arguments to be passed to
``kombu.Queue()``.
:param producer_options: additional keyword arguments to be passed to
``kombu.Producer()``.
"""
name = 'kombu'

def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None):
channel='socketio', write_only=False, logger=None,
connection_options=None, exchange_options=None,
queue_options=None, producer_options=None):
if kombu is None:
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
Expand All @@ -47,6 +57,10 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
write_only=write_only,
logger=logger)
self.url = url
self.connection_options = connection_options or {}
self.exchange_options = exchange_options or {}
self.queue_options = queue_options or {}
self.producer_options = producer_options or {}
self.producer = self._producer()

def initialize(self):
Expand All @@ -65,19 +79,22 @@ def initialize(self):
'with ' + self.server.async_mode)

def _connection(self):
return kombu.Connection(self.url)
return kombu.Connection(self.url, **self.connection_options)

def _exchange(self):
return kombu.Exchange(self.channel, type='fanout', durable=False)
options = {'type': 'fanout', 'durable': False}
options.update(self.exchange_options)
return kombu.Exchange(self.channel, **options)

def _queue(self):
queue_name = 'flask-socketio.' + str(uuid.uuid4())
return kombu.Queue(queue_name, self._exchange(),
durable=False,
queue_arguments={'x-expires': 300000})
options = {'durable': False, 'queue_arguments': {'x-expires': 300000}}
options.update(self.queue_options)
return kombu.Queue(queue_name, self._exchange(), **options)

def _producer(self):
return self._connection().Producer(exchange=self._exchange())
return self._connection().Producer(exchange=self._exchange(),
**self.producer_options)

def __error_callback(self, exception, interval):
self._get_logger().exception('Sleeping {}s'.format(interval))
Expand Down
8 changes: 6 additions & 2 deletions socketio/redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@ class RedisManager(PubSubManager): # pragma: no cover
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()``.
"""
name = 'redis'

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None):
write_only=False, logger=None, redis_options=None):
if redis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
super(RedisManager, self).__init__(channel=channel,
write_only=write_only,
Expand All @@ -64,7 +67,8 @@ def initialize(self):
'with ' + self.server.async_mode)

def _redis_connect(self):
self.redis = redis.Redis.from_url(self.redis_url)
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub()

def _publish(self, data):
Expand Down

0 comments on commit 7dbc470

Please sign in to comment.