Skip to content

Commit

Permalink
Merge pull request #1002 from vitaly-krugl/make-connect-non-blocking-…
Browse files Browse the repository at this point in the history
…and-refactor-interfaces

Refactor interfaces and make DNS/TCP/SSL connection setup non-blocking
  • Loading branch information
lukebakken committed Apr 26, 2018
2 parents 08080a8 + 8f90e34 commit 98e0e56
Show file tree
Hide file tree
Showing 52 changed files with 9,340 additions and 2,210 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
.tox
.DS_Store
.python-version
.pytest_cache/
pika.iml
codegen
pika.egg-info
Expand Down
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
language: python

sudo: false
# Turn on sudo mode to facilitate the IPv6 workaround per
# github.com/travis-ci/travis-ci/issues/8711. See also related reference in
# before_script section.
sudo: true

addons:
apt:
Expand Down Expand Up @@ -41,6 +44,8 @@ install:
- sed -e "s#PIKA_DIR#$TRAVIS_BUILD_DIR#g" "$TRAVIS_BUILD_DIR/testdata/rabbitmq.conf.in" > "$TRAVIS_BUILD_DIR/testdata/rabbitmq.conf"

before_script:
# Enable IPv6 for our tests - see github.com/travis-ci/travis-ci/issues/8711
- echo 0 | sudo tee /proc/sys/net/ipv6/conf/all/disable_ipv6
- pip freeze
- /bin/sh -c "RABBITMQ_PID_FILE=$TRAVIS_BUILD_DIR/rabbitmq.pid RABBITMQ_CONFIG_FILE=$TRAVIS_BUILD_DIR/testdata/rabbitmq $TRAVIS_BUILD_DIR/rabbitmq_server-$RABBITMQ_VERSION/sbin/rabbitmq-server &"
- /bin/sh "$TRAVIS_BUILD_DIR/rabbitmq_server-$RABBITMQ_VERSION/sbin/rabbitmqctl" wait "$TRAVIS_BUILD_DIR/rabbitmq.pid"
Expand Down
44 changes: 40 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Here is the most simple example of use, sending a message with the BlockingConne
channel = connection.channel()
channel.basic_publish(exchange='example',
routing_key='test',
body='Test Message')
body=b'Test Message')
connection.close()
And an example of writing a blocking consumer:
Expand All @@ -62,12 +62,34 @@ And an example of writing a blocking consumer:
print('Requeued %i messages' % requeued_messages)
connection.close()
Multiple Connection Parameters
------------------------------
You can also pass multiple connection parameter instances for
fault-tolerance as in the code snippet below (host names are just examples, of
course). To enable retries, set `connection_attempts` and `retry_delay` as
needed in the last `pika.ConnectionParameters` element of the sequence. Retries
occur after connection attempts using all of the given connection parameters
fail.

.. code :: python
import pika
configs = (
pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(configs)
With non-blocking adapters, you can request a connection using multiple
connection parameter instances via the connection adapter's
`create_connection()` class method.

Pika provides the following adapters
------------------------------------

- AsyncioConnection - adapter for the Python3 AsyncIO event loop
- BlockingConnection - enables blocking, synchronous operation on top of library for simple uses
- SelectConnection - fast asynchronous adapter
- BlockingConnection - enables blocking, synchronous operation on top of library for simple usage
- SelectConnection - fast asynchronous adapter without 3rd-party dependencies
- TornadoConnection - adapter for use with the Tornado IO Loop http://tornadoweb.org
- TwistedConnection - adapter for use with the Twisted asynchronous package http://twistedmatrix.com/

Expand All @@ -76,11 +98,25 @@ Contributing
To contribute to pika, please make sure that any new features or changes
to existing functionality **include test coverage**.

*Pull requests that add or change code without coverage will most likely be rejected.*
*Pull requests that add or change code without coverage will be rejected.*

Additionally, please format your code using `yapf <http://pypi.python.org/pypi/yapf>`_
with ``google`` style prior to issuing your pull request.

Extending to support additional I/O frameworks
----------------------------------------------
New non-blocking adapters may be implemented in either of the following ways:
- By subclassing :py:class:`pika.adapters.base_connection.BaseConnection` and
implementing its abstract method(s) and passing BaseConnection's constructor
an implementation of
:py.class:`pika.adapters.utils.nbio_interface.AbstractIOServices`. For
examples, refer to the implementations of
:py:class:`pika.AsyncioConnection` and :py:class:`pika.TornadoConnection`.
- By subclassing :py:class:`pika.connection.connection.Connection` and
implementing its abstract method(s). For an example, refer to the
implementation of
:py:class:`pika.adapters.twisted_connection.TwistedProtocolConnection`.

.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?
:target: http://badge.fury.io/py/pika

Expand Down
10 changes: 5 additions & 5 deletions docs/examples/asynchronous_consumer_example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,22 @@ consumer.py::
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)

def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_closed(self, connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.

:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
:param Exception reason: exception representing reason for loss of
connection.

"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.add_timeout(5, self.reconnect)

def reconnect(self):
Expand Down
22 changes: 17 additions & 5 deletions docs/examples/asynchronous_publisher_example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ publisher.py::
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)

def on_connection_open(self, unused_connection):
Expand All @@ -76,22 +77,33 @@ publisher.py::
LOGGER.info('Connection opened')
self.open_channel()

def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_open_error(self, unused_connection, err):
"""This method is called by pika if the connection to RabbitMQ
can't be established.

:type unused_connection: pika.SelectConnection
:type err: Exception

"""
LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
self._connection.add_timeout(5, self._connection.ioloop.stop)

def on_connection_closed(self, connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.

:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
:param Exception reason: exception representing reason for loss of
connection.

"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.add_timeout(5, self._connection.ioloop.stop)

def open_channel(self):
Expand Down
10 changes: 5 additions & 5 deletions docs/examples/asyncio_consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,22 @@ consumer.py::
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)

def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_closed(self, connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.

:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
:param Exception reason: exception representing reason for loss of
connection.

"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.add_timeout(5, self.reconnect)

def on_connection_open(self, unused_connection):
Expand Down
10 changes: 5 additions & 5 deletions docs/examples/tornado_consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,22 @@ consumer.py::
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)

def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_closed(self, connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.

:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
:param Exception reason: exception representing reason for loss of
connection.

"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.add_timeout(5, self.reconnect)

def on_connection_open(self, unused_connection):
Expand Down
13 changes: 7 additions & 6 deletions examples/asynchronous_consumer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,28 @@ def on_connection_open_error(self, unused_connection, err):
can't be established.
:type unused_connection: pika.SelectConnection
:type err: str
:type err: Exception
"""
LOGGER.error('Connection open failed: %s', err)
self._connection.add_timeout(5, self.reconnect)

def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_closed(self, connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.add_timeout(5, self.reconnect)

def reconnect(self):
Expand Down
15 changes: 8 additions & 7 deletions examples/asynchronous_publisher_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,28 @@ def on_connection_open_error(self, unused_connection, err):
can't be established.
:type unused_connection: pika.SelectConnection
:type err: str
:type err: Exception
"""
LOGGER.error('Connection open failed: %s', err)
LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
self._connection.add_timeout(5, self._connection.ioloop.stop)

def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_closed(self, connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.add_timeout(5, self._connection.ioloop.stop)

def open_channel(self):
Expand Down
9 changes: 6 additions & 3 deletions pika/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
__version__ = '1.0.0b1'

import logging
from logging import NullHandler

# Add NullHandler to prevent logging warnings
logging.getLogger(__name__).addHandler(NullHandler())
# Add NullHandler before importing Pika modules to prevent logging warnings
logging.getLogger(__name__).addHandler(logging.NullHandler())

# pylint: disable=C0413

from pika.connection import ConnectionParameters
from pika.connection import URLParameters
Expand All @@ -18,3 +19,5 @@
from pika.adapters import SelectConnection
from pika.adapters import TornadoConnection
from pika.adapters import TwistedConnection

from pika.adapters.utils.connection_workflow import AMQPConnectionWorkflow

0 comments on commit 98e0e56

Please sign in to comment.