Skip to content
This repository has been archived by the owner on Nov 17, 2018. It is now read-only.

Commit

Permalink
Pika connection parameters can be passed through CELERYT_PIKA_OPTIONS…
Browse files Browse the repository at this point in the history
… option
  • Loading branch information
mher committed Apr 25, 2013
1 parent 2ad2d16 commit 449221f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
5 changes: 4 additions & 1 deletion tcelery/__init__.py
Expand Up @@ -23,6 +23,9 @@ def setup_nonblocking_producer(celery_app=None, io_loop=None,


def connect(): def connect():
broker_url = celery_app.connection().as_uri(include_password=True) broker_url = celery_app.connection().as_uri(include_password=True)
NonBlockingTaskProducer.conn_pool.connect(broker_url, on_ready) options = celery_app.conf.get('CELERYT_PIKA_OPTIONS', {})
NonBlockingTaskProducer.conn_pool.connect(broker_url,
options=options,
callback=on_ready)


io_loop.add_callback(connect) io_loop.add_callback(connect)
15 changes: 10 additions & 5 deletions tcelery/connection.py
Expand Up @@ -17,15 +17,19 @@ def __init__(self):
self.channel = None self.channel = None
self.connection = None self.connection = None


def connect(self, url, callback=None): def connect(self, url, options=None, callback=None):
purl = urlparse(url) purl = urlparse(url)
credentials = pika.PlainCredentials(purl.username, purl.password) credentials = pika.PlainCredentials(purl.username, purl.password)
virtual_host = purl.path[1:] virtual_host = purl.path[1:]
host = purl.hostname host = purl.hostname
port = purl.port port = purl.port


params = pika.ConnectionParameters(host, port, virtual_host, options = options or {}
credentials) options = {k.lstrip('DEFAULT_').lower(): v for k, v in options.items()}
options.update(host=host, port=port, virtual_host=virtual_host,
credentials=credentials)

params = pika.ConnectionParameters(**options)
self.connection = TornadoConnection( self.connection = TornadoConnection(
params, on_open_callback=partial(self.on_connect, callback)) params, on_open_callback=partial(self.on_connect, callback))
self.connection.add_on_close_callback(self.on_closed) self.connection.add_on_close_callback(self.on_closed)
Expand Down Expand Up @@ -81,11 +85,12 @@ def __init__(self, limit):
self._connections = [] self._connections = []
self._connection = None self._connection = None


def connect(self, broker_url, callback=None): def connect(self, broker_url, options=None, callback=None):
self._on_ready = callback self._on_ready = callback
for _ in range(self._limit): for _ in range(self._limit):
conn = Connection() conn = Connection()
conn.connect(broker_url, partial(self._on_connect, conn)) conn.connect(broker_url, options=options,
callback=partial(self._on_connect, conn))


def _on_connect(self, connection): def _on_connect(self, connection):
self._connections.append(connection) self._connections.append(connection)
Expand Down

0 comments on commit 449221f

Please sign in to comment.