Skip to content

Commit

Permalink
Merge branch 'hairyhum-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed May 4, 2018
2 parents 62de151 + 71e8b8d commit c7fec31
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 0 deletions.
64 changes: 64 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,70 @@ Pika provides the following adapters
- TornadoConnection - adapter for use with the Tornado IO Loop http://tornadoweb.org
- TwistedConnection - adapter for use with the Twisted asynchronous package http://twistedmatrix.com/

Connection recovery
-------------------

Some RabbitMQ clients (Bunny, Java, .NET, Objective-C/Swift) provide a way to automatically recover connection, its channels
and topology (e.g. queues, bindings and consumers) after a network failure.
Others require connection recovery to be performed by the application code and strive to make
it a straightforward process. Pika falls into the second category.

Pika supports multiple connection adapters. They take different approaches
to connection recovery.

For BlockingConnection adapter exception handling can be used to check for
connection errors. Here's a very basic example:

.. code :: python
import pika
while(True):
try:
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_consume('queue-name', on_message_callback)
channel.start_consuming()
# Do not recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
This example can be found in `examples/consume_recover.py`.

Generic operation retry libraries such as `retry <https://github.com/invl/retry>`_
can be used:

.. code :: python
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_consume('queue-name', on_message_callback)
try:
channel.start_consuming()
# Do not recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()
Decorators make it possible to configure some additional recovery behaviours,
like delays between retries and limiting the number of retries.

The above example can be found in `examples/consume_recover_retry.py`.

For asynchronous adapters, use `on_close_callback` to react to connection failure events.
This callback can be used to clean up and recover the connection.

An example of recovery using `on_close_callback` can be found
in `examples/asynchronous_consumer_example.py`

Contributing
------------
To contribute to pika, please make sure that any new features or changes
Expand Down
1 change: 1 addition & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Pika has various methods of use, between the synchronous BlockingConnection adap
examples/connecting_async
examples/blocking_basic_get
examples/blocking_consume
examples/blocking_consume_recover_multiple_hosts
examples/blocking_consumer_generator
examples/comparing_publishing_sync_async
examples/blocking_delivery_confirmations
Expand Down
115 changes: 115 additions & 0 deletions docs/examples/blocking_consume_recover_multiple_hosts.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
Using the Blocking Connection with connection recovery with multiple hosts
==========================================================================

.. _example_blocking_basic_consume_recover_multiple_hosts:

RabbitMQ nodes can be `clustered <http://www.rabbitmq.com/clustering.html>`_.
In the absence of failure clients can connect to any node and perform any operation.
In case a node fails, stops, or becomes unavailable, clients should be able to
connect to another node and continue.

To simplify reconnection to a different node, connection recovery mechanism
should be combined with connection configuration that specifies multiple hosts.

The BlockingConnection adapter relies on exception handling to check for
connection errors::

import pika
import random

def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
print()
channel.basic_ack(delivery_tag=method_frame.delivery_tag)

## Assuming there are three hosts: host1, host2, and host3
node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]

while(True):
try:
print("Connecting...")
## Shuffle the hosts list before reconnecting.
## This can help balance connections.
random.shuffle(all_endpoints)
connection = pika.BlockingConnection(all_endpoints)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
## This queue is intentionally non-durable. See http://www.rabbitmq.com/ha.html#non-mirrored-queue-behavior-on-node-failure
## to learn more.
channel.queue_declare('recovery-example', durable = False, auto_delete = True)
channel.basic_consume('recovery-example', on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
break
except pika.exceptions.ConnectionClosedByBroker:
# Uncomment this to make the example not attempt recovery
# from server-initiated connection closure, including
# when the node is stopped cleanly
#
# break
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
print("Connection was closed, retrying...")
continue

Generic operation retry libraries such as `retry <https://github.com/invl/retry>`_
can prove useful.

To run the following example, install the library first with `pip install retry`.

In this example the `retry` decorator is used to set up recovery with delay::

import pika
import random
from retry import retry

def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
print()
channel.basic_ack(delivery_tag=method_frame.delivery_tag)

## Assuming there are three hosts: host1, host2, and host3
node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
random.shuffle(all_endpoints)
connection = pika.BlockingConnection(all_endpoints)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)

## This queue is intentionally non-durable. See http://www.rabbitmq.com/ha.html#non-mirrored-queue-behavior-on-node-failure
## to learn more.
channel.queue_declare('recovery-example', durable = False, auto_delete = True)
channel.basic_consume('recovery-example', on_message)

try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
except pika.exceptions.ConnectionClosedByBroker:
# Uncomment this to make the example not attempt recovery
# from server-initiated connection closure, including
# when the node is stopped cleanly
# except pika.exceptions.ConnectionClosedByBroker:
# pass
continue

consume()
47 changes: 47 additions & 0 deletions examples/blocking_consume_recover_multiple_hosts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import functools
import pika
import random

def on_message(channel, method_frame, header_frame, body, userdata=None):
print('Userdata: {} Message body: {}'.format(userdata, body))
channel.basic_ack(delivery_tag=method_frame.delivery_tag)

credentials = pika.PlainCredentials('guest', 'guest')

params1 = pika.ConnectionParameters('localhost', port=5672, credentials=credentials)
params2 = pika.ConnectionParameters('localhost', port=5673, credentials=credentials)
params3 = pika.ConnectionParameters('localhost', port=5674, credentials=credentials)
params_all = [params1, params2, params3]


# Infinite loop
while(True):
try:
random.shuffle(params_all)
connection = pika.BlockingConnection(params_all)
channel = connection.channel()
channel.exchange_declare(exchange='test_exchange', exchange_type='direct', passive=False, durable=True, auto_delete=False)
channel.queue_declare(queue='standard', auto_delete=True)
channel.queue_bind(queue='standard', exchange='test_exchange', routing_key='standard_key')
channel.basic_qos(prefetch_count=1)

on_message_callback = functools.partial(on_message, userdata='on_message_userdata')
channel.basic_consume('standard', on_message_callback)

try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

connection.close()
break
# Do not recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue

0 comments on commit c7fec31

Please sign in to comment.