Skip to content

Commit

Permalink
merge upstream master
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Laing committed Jan 17, 2014
2 parents 03d5da3 + 24c3f1d commit c121243
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 12 deletions.
6 changes: 3 additions & 3 deletions docs/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ Introduction to Pika

IO and Event Looping
--------------------
As AMQP is a two-way RPC protocol where the client can send requests to the server and the server can send requests to a client, Pika implements or extends IO loops in each of its asynchronous connection adapters. These IO loops are blocking methods which loop and listen for events. Each asynchronous adapters follows the same standard for invoking the IO loop. The IO loop is created when the connection adapter is created. To start an IO loop for any given adapter, call the connection.ioloop.start() method.
As AMQP is a two-way RPC protocol where the client can send requests to the server and the server can send requests to a client, Pika implements or extends IO loops in each of its asynchronous connection adapters. These IO loops are blocking methods which loop and listen for events. Each asynchronous adapters follows the same standard for invoking the IO loop. The IO loop is created when the connection adapter is created. To start an IO loop for any given adapter, call the ``connection.ioloop.start()`` method.

If you are using an external IO loop such as Tornado's :class:`tornado.ioloop.IOLoop`, you invoke it as you normally would and then add the adapter to it.
If you are using an external IO loop such as Tornado's :class:`~tornado.ioloop.IOLoop`, you invoke it as you normally would and then add the adapter to it.

Example::

Expand Down Expand Up @@ -94,7 +94,7 @@ Example::

Credentials
-----------
The :module:`pika.credentials` module provides the mechanism by which you pass the username and password to the :py:class:`ConnectionParameters <pika.connection.ConnectionParameters>` class when it is created.
The :mod:`pika.credentials` module provides the mechanism by which you pass the username and password to the :py:class:`ConnectionParameters <pika.connection.ConnectionParameters>` class when it is created.

Example::

Expand Down
10 changes: 8 additions & 2 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ def close(self, reply_code=0, reply_text="Normal Shutdown"):
self._set_state(self.CLOSED)
self._cleanup()

def consume(self, queue):
def consume(self, queue, no_ack=False, exclusive=False):
"""Blocking consumption of a queue instead of via a callback. This
method is a generator that returns messages a tuple of method,
properties, and body.
Expand All @@ -696,14 +696,20 @@ def consume(self, queue):
:param queue: The queue name to consume
:type queue: str or unicode
:param no_ack: Tell the broker to not expect a response
:type no_ack: bool
:param exclusive: Don't allow other consumers on the queue
:type exclusive: bool
:rtype: tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode)
"""
LOGGER.debug('Forcing data events on')
if not self._generator:
LOGGER.debug('Issuing Basic.Consume')
self._generator = self.basic_consume(self._generator_callback,
queue)
queue,
no_ack,
exclusive)
while True:
if self._generator_messages:
yield self._generator_messages.pop(0)
Expand Down
6 changes: 4 additions & 2 deletions pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def basic_cancel(self, callback=None, consumer_tag='', nowait=False):
{'consumer_tag': consumer_tag})] if nowait is False else [])

def basic_consume(self, consumer_callback, queue='', no_ack=False,
exclusive=False, consumer_tag=None):
exclusive=False, consumer_tag=None, arguments=None):
"""Sends the AMQP command Basic.Consume to the broker and binds messages
for the consumer_tag to the consumer callback. If you do not pass in
a consumer_tag, one will be automatically generated for you. Returns
Expand All @@ -197,6 +197,7 @@ def basic_consume(self, consumer_callback, queue='', no_ack=False,
:param bool exclusive: Don't allow other consumers on the queue
:param consumer_tag: Specify your own consumer tag
:type consumer_tag: str or unicode
:param dict arguments: Custom key/value pair arguments for the consume
:rtype: str
"""
Expand All @@ -214,7 +215,8 @@ def basic_consume(self, consumer_callback, queue='', no_ack=False,
self._rpc(spec.Basic.Consume(queue=queue,
consumer_tag=consumer_tag,
no_ack=no_ack,
exclusive=exclusive),
exclusive=exclusive,
arguments=arguments or dict()),
self._on_eventok,
[(spec.Basic.ConsumeOk,
{'consumer_tag': consumer_tag})])
Expand Down
4 changes: 2 additions & 2 deletions pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ class should not be invoked directly but rather through the use of an
:param method on_open_callback: Called when the connection is opened
:param method on_open_error_callback: Called if the connection cant
be opened
:param method on_open_callback: Called when the connection is closed
:param method on_close_callback: Called when the connection is closed
"""
ON_CONNECTION_BACKPRESSURE = '_on_connection_backpressure'
Expand Down Expand Up @@ -566,7 +566,7 @@ def __init__(self,
:param method on_open_callback: Called when the connection is opened
:param method on_open_error_callback: Called if the connection cant
be opened
:param method on_open_callback: Called when the connection is closed
:param method on_close_callback: Called when the connection is closed
"""
# Define our callback dictionary
Expand Down
2 changes: 1 addition & 1 deletion pika/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def encode_value(pieces, value):
pieces.append(struct.pack('>c', 'V'))
return 1
else:
raise exceptions.UnspportedAMQPFieldException(pieces, value)
raise exceptions.UnsupportedAMQPFieldException(pieces, value)


def decode_table(encoded, offset):
Expand Down
6 changes: 5 additions & 1 deletion pika/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ def __repr__(self):
return 'Unsupported field kind %s' % self.args[0]


class UnspportedAMQPFieldException(ProtocolSyntaxError):
class UnsupportedAMQPFieldException(ProtocolSyntaxError):
def __repr__(self):
return 'Unsupported field kind %s' % type(self.args[1])


class UnspportedAMQPFieldException(UnsupportedAMQPFieldException):
"""Deprecated version of UnsupportedAMQPFieldException"""


class MethodNotImplemented(AMQPError):
pass

Expand Down
2 changes: 1 addition & 1 deletion tests/data_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_decode_table_bytes(self):
self.assertEqual(byte_count, 191)

def test_encode_raises(self):
self.assertRaises(exceptions.UnspportedAMQPFieldException,
self.assertRaises(exceptions.UnsupportedAMQPFieldException,
data.encode_table,
[], {'foo': set([1, 2, 3])})

Expand Down

0 comments on commit c121243

Please sign in to comment.