Permalink
Browse files

Minor, work in progress cleanups of these two connection adapters

  • Loading branch information...
1 parent 4e990c9 commit 3575fc92d005bd9ffb393f0bea8a8c3bce84af05 Gavin M. Roy committed Oct 2, 2012
Showing with 85 additions and 76 deletions.
  1. +5 −5 pika/adapters/asyncore_connection.py
  2. +80 −71 pika/adapters/twisted_connection.py
@@ -17,7 +17,7 @@
except ImportError:
SSL = False
-from pika.adapters.base_connection import BaseConnection
+from pika.adapters import base_connection
from pika.exceptions import AMQPConnectionError
LOGGER = logging.getLogger(__name__)
@@ -53,7 +53,8 @@ def __init__(self, parameters):
# Loop while we have remaining attempts
while remaining_attempts:
try:
- return self._socket_connect()
+ self._socket_connect()
+ return
except socket.error, err:
remaining_attempts -= 1
if not remaining_attempts:
@@ -214,7 +215,7 @@ def stop(self):
self.close()
-class AsyncoreConnection(BaseConnection):
+class AsyncoreConnection(base_connection.BaseConnection):
def _adapter_connect(self):
"""
@@ -223,10 +224,9 @@ def _adapter_connect(self):
the handle to self so that the AsyncoreDispatcher object can call back
into our various state methods.
"""
- self.ioloop = AsyncoreDispatcher(self.parameters)
+ self.ioloop = AsyncoreDispatcher(self.params)
# Map some core values for compatibility
self.ioloop._handle_error = self._handle_error
self.ioloop.connection = self
- self.ioloop.suggested_buffer_size = self._suggested_buffer_size
self.socket = self.ioloop.socket
@@ -3,10 +3,9 @@
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****
-"""
-Using Pika with a Twisted reactor.
+"""Using Pika with a Twisted reactor.
-Supports two methods of estabilishing the connection, using TwistedConnection
+Supports two methods of establishing the connection, using TwistedConnection
or TwistedProtocolConnection. For details about each method, see the docstrings
of the corresponding classes.
@@ -16,14 +15,15 @@
returns a Twisted DeferredQueue where messages from the server will be
stored. Refer to the docstrings for TwistedConnection.channel() and the
TwistedChannel class for details.
+
"""
import functools
import time
from twisted.internet import defer, error, reactor
from twisted.python import log
from pika import exceptions
-from pika.adapters.base_connection import BaseConnection, READ, WRITE
+from pika.adapters import base_connection
class ClosableDeferredQueue(defer.DeferredQueue):
@@ -34,7 +34,7 @@ class ClosableDeferredQueue(defer.DeferredQueue):
"""
def __init__(self, size=None, backlog=None):
self.closed = None
- defer.DeferredQueue.__init__(self, size, backlog)
+ super(ClosableDeferredQueue, self).__init__(size, backlog)
def put(self, obj):
if self.closed:
@@ -54,8 +54,7 @@ def close(self, reason):
class TwistedChannel(object):
- """
- A wrapper wround Pika's Channel.
+ """A wrapper wround Pika's Channel.
Channel methods that are normally take a callback argument are wrapped to
return a Deferred that fires with whatever would be passed to the callback.
@@ -96,8 +95,7 @@ def channel_closed(self, code, text):
self.__consumers = {}
def basic_consume(self, *args, **kwargs):
- """
- Consume from a server queue. Returns a Deferred that fires with a
+ """Consume from a server queue. Returns a Deferred that fires with a
tuple: (queue_object, consumer_tag). The queue object is an instance of
ClosableDeferredQueue, where data received from the queue will be
stored. Clients should use its get() method to fetch individual
@@ -119,9 +117,9 @@ def basic_consume(self, *args, **kwargs):
return defer.succeed((queue, consumer_tag))
def queue_delete(self, *args, **kwargs):
- """
- Wraps the method the same way all the others are wrapped, but removes
+ """Wraps the method the same way all the others are wrapped, but removes
the reference to the queue object after it gets deleted on the server.
+
"""
wrapped = self.__wrap_channel_method('queue_delete')
queue_name = kwargs['queue']
@@ -130,20 +128,20 @@ def queue_delete(self, *args, **kwargs):
return d.addCallback(self.__clear_consumer, queue_name)
def basic_publish(self, *args, **kwargs):
- """
- Make sure the channel is not closed and then publish. Return a Deferred
- that fires with the result of the channel's basic_publish.
+ """Make sure the channel is not closed and then publish. Return a
+ Deferred that fires with the result of the channel's basic_publish.
+
"""
if self.__closed:
return defer.fail(self.__closed)
return defer.succeed(self.__channel.basic_publish(*args, **kwargs))
def __wrap_channel_method(self, name):
- """
- Wrap Pika's Channel method to make it return a Deferred that fires when
- the method completes and errbacks if the channel gets closed. If the
- original method's callback would receive more than one argument, the
+ """Wrap Pika's Channel method to make it return a Deferred that fires
+ when the method completes and errbacks if the channel gets closed. If
+ the original method's callback would receive more than one argument, the
Deferred fires with a tuple of argument values.
+
"""
method = getattr(self.__channel, name)
@@ -182,10 +180,10 @@ def __getattr__(self, name):
class IOLoopReactorAdapter(object):
- """
- An adapter providing Pika's IOLoop interface using a Twisted reactor.
+ """An adapter providing Pika's IOLoop interface using a Twisted reactor.
Accepts a TwistedConnection object and a Twisted reactor object.
+
"""
def __init__(self, connection, reactor):
self.connection = connection
@@ -239,20 +237,19 @@ def update_handler(self, _, event_state):
self.reactor.removeReader(self.connection)
self.reactor.removeWriter(self.connection)
- if event_state & READ:
+ if event_state & base_connection.READ:
self.reactor.addReader(self.connection)
- if event_state & WRITE:
+ if event_state & base_connection.WRITE:
self.reactor.addWriter(self.connection)
-class TwistedConnection(BaseConnection):
- """
- A standard Pika connection adapter. You instantiate the class passing the
+class TwistedConnection(base_connection.BaseConnection):
+ """A standard Pika connection adapter. You instantiate the class passing the
connection parameters and the connected callback and when it gets called
you can start using it.
- The problem is that connection estabilishing is done using the blocking
+ The problem is that connection establishing is done using the blocking
socket module. For instance, if the host you are connecting to is behind a
misconfigured firewall that just drops packets, the whole process will
freeze until the connection timeout passes. To work around that problem,
@@ -262,15 +259,16 @@ class TwistedConnection(BaseConnection):
when the socket connection becomes readable or writable, so apart from
implementing the BaseConnection interface, they also provide Twisted's
IReadWriteDescriptor interface.
- """
- # BaseConnection methods
-
- def _adapter_connect(self, host, port):
+ """
+ def _adapter_connect(self):
+ """Connect to the RabbitMQ broker"""
# Connect (blockignly!) to the server
- BaseConnection._adapter_connect(self, host, port)
- # Pnce that's done, create an I/O loop by adapting the Twisted reactor
+ super(base_connection.BaseConnection, self)._adapter_connect()
+
+ # Create an I/O loop by adapting the Twisted reactor
self.ioloop = IOLoopReactorAdapter(self, reactor)
+
# Set the I/O events we're waiting for (see IOLoopReactorAdapter
# docstrings for why it's OK to pass None as the file descriptor)
self.ioloop.update_handler(None, self.event_state)
@@ -279,32 +277,34 @@ def _adapter_connect(self, host, port):
self._on_connected()
def _adapter_disconnect(self):
- # Remove from the IOLoop
+ """Called when the adapter should disconnect"""
self.ioloop.remove_handler(None)
-
- # Close our socket since the Connection class told us to do so
self.socket.close()
- def _on_connected(self):
- # Call superclass and then update the event state to flush the outgoing
- # frame out. Commit 50d842526d9f12d32ad9f3c4910ef60b8c301f59 removed a
- # self._flush_outbound call that was in _send_frame which previously
- # made this step unnecessary.
- BaseConnection._on_connected(self)
- self._manage_event_state()
-
def _handle_disconnect(self):
- # Do not stop the reactor, this would cause the entire process to exit,
- # just fire the disconnect callbacks
+ """Do not stop the reactor, this would cause the entire process to exit,
+ just fire the disconnect callbacks
+
+ """
self._on_connection_closed(None, True)
- def channel(self, channel_number=None):
+ def _on_connected(self):
+ """Call superclass and then update the event state to flush the outgoing
+ frame out. Commit 50d842526d9f12d32ad9f3c4910ef60b8c301f59 removed a
+ self._flush_outbound call that was in _send_frame which previously
+ made this step unnecessary.
+
"""
- Return a Deferred that fires with an instance of a wrapper aroud the
+ super(TwistedConnection, self)._on_connected()
+ self._manage_event_state()
+
+ def channel(self, channel_number=None):
+ """Return a Deferred that fires with an instance of a wrapper around the
Pika Channel class.
+
"""
d = defer.Deferred()
- BaseConnection.channel(self, d.callback, channel_number)
+ base_connection.BaseConnection.channel(self, d.callback, channel_number)
return d.addCallback(TwistedChannel)
# IReadWriteDescriptor methods
@@ -330,9 +330,8 @@ def doWrite(self):
self._manage_event_state()
-class TwistedProtocolConnection(BaseConnection):
- """
- Ahybrid between a Pika Connection and a Twisted Protocol. Allows using
+class TwistedProtocolConnection(base_connection.BaseConnection):
+ """A hybrid between a Pika Connection and a Twisted Protocol. Allows using
Twisted's non-blocking connectTCP/connectSSL methods for connecting to the
server.
@@ -341,18 +340,16 @@ class TwistedProtocolConnection(BaseConnection):
ready to be used (the initial AMQP handshaking has been done). You *have*
to wait for this Deferred to fire before requesting a channel.
- Since it's Twisted handling connection estabilishing, it does not accept
- connect callbacks or reconnection strategy objects, you have to implement
- that within Twisted. Also remember that the host, port and ssl values of
- the connection parameters are ignored because, yet again, it's Twisted who
- manages the connection.
- """
-
- # BaseConnection methods
+ Since it's Twisted handling connection establishing it does not accept
+ connect callbacks, you have to implement that within Twisted. Also remember
+ that the host, port and ssl values of the connection parameters are ignored
+ because, yet again, it's Twisted who manages the connection.
+ """
def __init__(self, parameters):
self.ready = defer.Deferred()
- BaseConnection.__init__(self, parameters, self.connectionReady)
+ super(TwistedProtocolConnection, self).__init__(parameters,
+ self.connectionReady)
self.ioloop = IOLoopReactorAdapter(self, reactor)
def _adapter_connect(self):
@@ -363,24 +360,36 @@ def _adapter_disconnect(self):
# Disconnect from the server
self.transport.loseConnection()
- def _send_frame(self, frame):
- marshalled_frame = frame.marshal()
- self.bytes_sent += len(marshalled_frame)
- self.frames_sent += 1
+ def _send_frame(self, frame_value):
+ """Send data the Twisted way, by writing to the transport. No need for
+ buffering, Twisted handles that by itself.
- # XXX: no backpressure support yet
+ :param frame_value: The frame to write
+ :type frame_value: pika.frame.Frame|pika.frame.ProtocolHeader
- # Send data the Twisted way, by writing to the transport. No need for
- # buffering, Twisted handles that by itself.
- self.transport.write(marshalled_frame)
+ """
+ if self.is_closed:
+ raise exceptions.ConnectionClosed
+ marshaled_frame = frame_value.marshal()
+ self.bytes_sent += len(marshaled_frame)
+ self.frames_sent += 1
+ self.transport.write(marshaled_frame)
def channel(self, channel_number=None):
- """
- Return a Deferred that fires with an instance of a wrapper aroud the
+ """Create a new channel with the next available channel number or pass
+ in a channel number to use. Must be non-zero if you would like to
+ specify but it is recommended that you let Pika manage the channel
+ numbers.
+
+ Return a Deferred that fires with an instance of a wrapper around the
Pika Channel class.
+
+ :param int channel_number: The channel number to use, defaults to the
+ next available.
+
"""
d = defer.Deferred()
- BaseConnection.channel(self, d.callback, channel_number)
+ base_connection.BaseConnection.channel(self, d.callback, channel_number)
return d.addCallback(TwistedChannel)
# IProtocol methods

0 comments on commit 3575fc9

Please sign in to comment.