Skip to content
Browse files

Handling unexpected disconnects & trying to reconnect

  • Loading branch information...
1 parent 449221f commit 468fa11d14e996dcd81c4d6cc2dd604d0db6b6bb @Sapphire64 Sapphire64 committed May 6, 2013
Showing with 33 additions and 12 deletions.
  1. +33 −12 tcelery/connection.py
View
45 tcelery/connection.py
@@ -5,8 +5,10 @@
from itertools import cycle
import pika
+import logging
from pika.adapters.tornado_connection import TornadoConnection
+from pika.exceptions import AMQPConnectionError
class Connection(object):
@@ -16,9 +18,12 @@ class Connection(object):
def __init__(self):
self.channel = None
self.connection = None
+ self.url = None
- def connect(self, url, options=None, callback=None):
- purl = urlparse(url)
+ def connect(self, url=None, options=None, callback=None):
+ if url is not None:
+ self.url = url
+ purl = urlparse(self.url)
credentials = pika.PlainCredentials(purl.username, purl.password)
virtual_host = purl.path[1:]
host = purl.hostname
@@ -30,13 +35,18 @@ def connect(self, url, options=None, callback=None):
credentials=credentials)
params = pika.ConnectionParameters(**options)
- self.connection = TornadoConnection(
- params, on_open_callback=partial(self.on_connect, callback))
- self.connection.add_on_close_callback(self.on_closed)
+ try:
+ TornadoConnection(params, stop_ioloop_on_close=False,
+ on_open_callback=partial(self.on_connect, callback))
+ except AMQPConnectionError:
+ logging.info('Retrying to connect in 2 seconds')
+ self.connection.add_timeout(2, partial(self.connect, url=url,
+ options=options, callback=callback))
def on_connect(self, callback, connection):
self.connection = connection
- connection.channel(partial(self.on_channel_open, callback))
+ self.connection.add_on_close_callback(self.on_closed)
+ self.connection.channel(partial(self.on_channel_open, callback))
def on_channel_open(self, callback, channel):
self.channel = channel
@@ -49,8 +59,20 @@ def on_exchange_declare(self, frame):
def on_basic_cancel(self, frame):
self.connection.close()
- def on_closed(self, connection):
- pass
+ def on_closed(self, connection, reply_code, reply_text):
+ """This method is invoked by pika when the connection to RabbitMQ is
+ closed unexpectedly. Since it is unexpected, we will reconnect to
+ RabbitMQ if it disconnects.
+
+ :param pika.connection.Connection connection: The closed connection obj
+ :param int reply_code: The server provided reply_code if given
+ :param str reply_text: The server provided reply_text if given
+
+ """
+ self._channel = None
+ logging.warning('Connection closed, reopening in 5 seconds: (%s) %s',
+ reply_code, reply_text)
+ connection.add_timeout(5, self.connect)
def publish(self, body, exchange=None, routing_key=None,
mandatory=False, immediate=False, content_type=None,
@@ -62,10 +84,9 @@ def publish(self, body, exchange=None, routing_key=None,
properties = pika.BasicProperties(content_type=content_type)
- self.channel.basic_publish(
- exchange=exchange, routing_key=routing_key, body=body,
- properties=properties, mandatory=mandatory,
- immediate=immediate)
+ self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body,
+ properties=properties, mandatory=mandatory,
+ immediate=immediate)
def consume(self, queue, callback, x_expires=None):
assert self.channel

0 comments on commit 468fa11

Please sign in to comment.
Something went wrong with that request. Please try again.