Permalink
Browse files

Merge pull request #8 from Sapphire64/reconnect

Handling unexpected disconnects & trying to reconnect (fixing #6)
  • Loading branch information...
2 parents dca0348 + 468fa11 commit ab2479422fc304ecc7149aad364d836c08fc6612 @mher committed May 9, 2013
Showing with 33 additions and 12 deletions.
  1. +33 −12 tcelery/connection.py
View
@@ -5,8 +5,10 @@
from itertools import cycle
import pika
+import logging
from pika.adapters.tornado_connection import TornadoConnection
+from pika.exceptions import AMQPConnectionError
class Connection(object):
@@ -16,9 +18,12 @@ class Connection(object):
def __init__(self):
self.channel = None
self.connection = None
+ self.url = None
- def connect(self, url, options=None, callback=None):
- purl = urlparse(url)
+ def connect(self, url=None, options=None, callback=None):
+ if url is not None:
+ self.url = url
+ purl = urlparse(self.url)
credentials = pika.PlainCredentials(purl.username, purl.password)
virtual_host = purl.path[1:]
host = purl.hostname
@@ -30,13 +35,18 @@ def connect(self, url, options=None, callback=None):
credentials=credentials)
params = pika.ConnectionParameters(**options)
- self.connection = TornadoConnection(
- params, on_open_callback=partial(self.on_connect, callback))
- self.connection.add_on_close_callback(self.on_closed)
+ try:
+ TornadoConnection(params, stop_ioloop_on_close=False,
+ on_open_callback=partial(self.on_connect, callback))
+ except AMQPConnectionError:
+ logging.info('Retrying to connect in 2 seconds')
+ self.connection.add_timeout(2, partial(self.connect, url=url,
@rbu

rbu Jul 4, 2013

Contributor

self.connection refers to the previous connection, and it is None if this is the first connection attempt. Since the exception is thrown during object instantiation, there's no way to call a method on the object, so the best thing I can think of would be to add the timeout to the IOLoop by ourselves.
There's an ioloop reference in setup_nonblocking_producer, it could be handed into the ConnectionPool and thus the Connection.

@mher

mher Jul 4, 2013

Owner

Can you describe the problem in more detail?

@rbu

rbu Jul 4, 2013

Contributor

self.connection will only be set when a connection has been established successfully. If this is not the case, will will be None when the except handler is called. To reproduce, start a tornado with celery without a rabbitmq running. It cause this traceback:

[E 130704 17:52:56 ioloop:410] Exception in callback <tornado.stack_context._StackContextWrapper object at 0x340dc58>
    Traceback (most recent call last):
      File "tornado/ioloop.py", line 396, in _run_callback
        callback()
      File "tcelery/__init__.py", line 31, in connect
        callback=on_ready)
      File "tcelery/connection.py", line 114, in connect
        callback=partial(self._on_connect, conn))
      File "tcelery/connection.py", line 43, in connect
        self.connection.add_timeout(2, partial(self.connect, url=url,
    AttributeError: 'NoneType' object has no attribute 'add_timeout'

Furthermore, even when self.connection is not None (i.e. a connection had been established, but it broke), the instance you are using to call add_timeout on is the old (broken) connection. While this works (because all the pika Connection class is doing is calling self.ioloop.add_timeout), it seems to be a smell. A clean implementation would be to store a reference to the ioloop in tcelery's Connection class.

@mher

mher Jul 4, 2013

Owner

You are right. Can you create a pull request?

@rbu

rbu Jul 4, 2013

Contributor

Not before Monday. Will get back if you're not faster than I am.

@rbu

rbu Jul 8, 2013

Contributor

Hey Mher, I won't be able to work on this. We've decided not to go with Celery for now (unrelated to tornado-celery).

@mher

mher Jul 8, 2013

Owner

Thanks for pointing out the issue!

+ options=options, callback=callback))
def on_connect(self, callback, connection):
self.connection = connection
- connection.channel(partial(self.on_channel_open, callback))
+ self.connection.add_on_close_callback(self.on_closed)
+ self.connection.channel(partial(self.on_channel_open, callback))
def on_channel_open(self, callback, channel):
self.channel = channel
@@ -49,8 +59,20 @@ def on_exchange_declare(self, frame):
def on_basic_cancel(self, frame):
self.connection.close()
- def on_closed(self, connection):
- pass
+ def on_closed(self, connection, reply_code, reply_text):
+ """This method is invoked by pika when the connection to RabbitMQ is
+ closed unexpectedly. Since it is unexpected, we will reconnect to
+ RabbitMQ if it disconnects.
+
+ :param pika.connection.Connection connection: The closed connection obj
+ :param int reply_code: The server provided reply_code if given
+ :param str reply_text: The server provided reply_text if given
+
+ """
+ self._channel = None
+ logging.warning('Connection closed, reopening in 5 seconds: (%s) %s',
+ reply_code, reply_text)
+ connection.add_timeout(5, self.connect)
def publish(self, body, exchange=None, routing_key=None,
mandatory=False, immediate=False, content_type=None,
@@ -62,10 +84,9 @@ def publish(self, body, exchange=None, routing_key=None,
properties = pika.BasicProperties(content_type=content_type)
- self.channel.basic_publish(
- exchange=exchange, routing_key=routing_key, body=body,
- properties=properties, mandatory=mandatory,
- immediate=immediate)
+ self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body,
+ properties=properties, mandatory=mandatory,
+ immediate=immediate)
def consume(self, queue, callback, x_expires=None):
assert self.channel

0 comments on commit ab24794

Please sign in to comment.