Skip to content
Browse files

Improves connection failure handling

  • Loading branch information...
1 parent 829b647 commit de9358f41dc822626b4968bf8273184565eed61f @mher committed Aug 8, 2013
Showing with 14 additions and 6 deletions.
  1. +14 −6 tcelery/connection.py
View
20 tcelery/connection.py
@@ -3,22 +3,26 @@
from urlparse import urlparse
from functools import partial
from itertools import cycle
+from datetime import timedelta
import pika
import logging
from pika.adapters.tornado_connection import TornadoConnection
from pika.exceptions import AMQPConnectionError
+from tornado import ioloop
+
class Connection(object):
content_type = 'application/x-python-serialize'
- def __init__(self):
+ def __init__(self, io_loop=None):
self.channel = None
self.connection = None
self.url = None
+ self.io_loop = io_loop or ioloop.IOLoop.instance()
def connect(self, url=None, options=None, callback=None):
if url is not None:
@@ -37,11 +41,14 @@ def connect(self, url=None, options=None, callback=None):
params = pika.ConnectionParameters(**options)
try:
TornadoConnection(params, stop_ioloop_on_close=False,
- on_open_callback=partial(self.on_connect, callback))
+ on_open_callback=partial(self.on_connect, callback),
+ custom_ioloop=self.io_loop)
except AMQPConnectionError:
logging.info('Retrying to connect in 2 seconds')
- self.connection.add_timeout(2, partial(self.connect, url=url,
- options=options, callback=callback))
+ self.io_loop.add_timeout(
+ timedelta(seconds=2),
+ partial(self.connect, url=url,
+ options=options, callback=callback))
def on_connect(self, callback, connection):
self.connection = connection
@@ -101,15 +108,16 @@ def on_queue_declared(self, *args, **kwargs):
class ConnectionPool(object):
- def __init__(self, limit):
+ def __init__(self, limit, io_loop=None):
self._limit = limit
self._connections = []
self._connection = None
+ self.io_loop = io_loop
def connect(self, broker_url, options=None, callback=None):
self._on_ready = callback
for _ in range(self._limit):
- conn = Connection()
+ conn = Connection(io_loop=self.io_loop)
conn.connect(broker_url, options=options,
callback=partial(self._on_connect, conn))

0 comments on commit de9358f

Please sign in to comment.
Something went wrong with that request. Please try again.