Skip to content

Commit

Permalink
Merge fe82f95 into dd70de5
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito authored Apr 2, 2017
2 parents dd70de5 + fe82f95 commit fd5aa90
Show file tree
Hide file tree
Showing 14 changed files with 571 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ instance/

# Sphinx documentation
docs/_build/
docs/source/apidoc

# PyBuilder
target/
Expand Down
19 changes: 7 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
language: python
services:
- rabbitmq
addons:
apt:
sources:
- deadsnakes
packages:
- python3.5
env:
- TOXENV=py34
- TOXENV=py35
language: python
python:
- "3.4"
- "3.5"
- "3.6"
install:
- virtualenv -p python3 /tmp/env
- /tmp/env/bin/pip install tox coveralls
- pip install tox coveralls
script:
- /tmp/env/bin/tox
- tox
19 changes: 19 additions & 0 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@


class Channel(BaseChannel):
""" Channel abstraction """

__slots__ = ('__connection', '__closing', '__confirmations', '__delivery_tag',
'loop', '_futures', '__channel', 'default_exchange')

def __init__(self, connection,
loop: asyncio.AbstractEventLoop, future_store: FutureStore):
"""
:param connection: :class:`aio_pika.adapter.AsyncioConnection` instance
:param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`)
:param future_store: :class:`aio_pika.common.FutureStore` instance
"""
super().__init__(loop, future_store.get_child())

self.__channel = None # type: pika.channel.Channel
Expand Down Expand Up @@ -143,6 +150,18 @@ def _publish(self, queue_name, routing_key, body, properties, mandatory, immedia
@asyncio.coroutine
def declare_queue(self, name: str = '', *, durable: bool = None, exclusive: bool = False,
auto_delete: bool = False, arguments: dict = None, timeout: int = None) -> Queue:
"""
:param name: queue name
:param durable: Durability (queue survive broker restart)
:param exclusive: Makes this queue exclusive. Exclusive queues may only be \
accessed by the current connection, and are deleted when that connection \
closes. Passive declaration of an exclusive queue by other connections are not allowed.
:param auto_delete: Delete queue when channel will be closed.
:param arguments: pika additional arguments
:param timeout: execution timeout
:return: :class:`aio_pika.queue.Queue` instance
"""

if auto_delete and durable is None:
durable = False
Expand Down
36 changes: 34 additions & 2 deletions aio_pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import warnings
from functools import wraps
from logging import getLogger
from typing import Callable

from pika import ConnectionParameters
from pika.credentials import PlainCredentials
from pika.spec import REPLY_SUCCESS
Expand All @@ -27,6 +29,8 @@ def wrap(self, *args, **kwargs):


class Connection:
""" Connection abstraction """

__slots__ = (
'loop', '__closing', '_connection', '_futures', '__sender_lock',
'_io_loop', '__connecting', '__connection_parameters', '__credentials',
Expand Down Expand Up @@ -68,11 +72,18 @@ def __repr__(self):
cls_name = self.__class__.__name__
return '<{0}: "{1}">'.format(cls_name, str(self))

def add_close_callback(self, callback):
def add_close_callback(self, callback: Callable[[], None]):
""" Add callback which will be called after connection will be closed.
:return: None
"""

self.__closing.add_done_callback(callback)

@property
def is_closed(self):
""" Is this connection are closed """

if not (self._connection and self._connection.socket):
return True

Expand All @@ -83,10 +94,13 @@ def is_closed(self):

@property
def closing(self):
""" Return future which will be finished after connection close. """
return copy_future(self.__closing)

@asyncio.coroutine
def connect(self):
""" Perform connect. This method should be called after :func:`aio_pika.connection.Connection.__init__`"""

if self.closing.done():
raise RuntimeError("Invalid connection state")

Expand Down Expand Up @@ -139,7 +153,7 @@ def _on_connection_lost(_, code, reason):
@_ensure_connection
@asyncio.coroutine
def channel(self) -> Channel:

""" Get a channel """
log.debug("Creating AMQP channel for conneciton: %r", self)

channel = Channel(self, self.loop, self._futures)
Expand All @@ -151,6 +165,7 @@ def channel(self) -> Channel:

@asyncio.coroutine
def close(self) -> None:
""" Close AMQP connection """
log.debug("Closing AMQP connection")
self._connection.close()
yield from self.closing
Expand All @@ -161,6 +176,23 @@ def connect(url: str=None, *, host: str='localhost',
port: int=5672, login: str='guest',
password: str='guest', virtualhost: str='/',
ssl: bool=False, loop=None, **kwargs) -> Connection:
""" Make connection to the broker
:param url: `RFC3986`_ formatted broker address. When :class:`None` will be used keyword arguments.
:param host: hostname of the broker
:param port: broker port 5672 by default
:param login: username string. `'guest'` by default.
:param password: password string. `'guest'` by default.
:param virtualhost: virtualhost parameter. `'/'` by default
:param ssl: use SSL for connection. Should be used with addition kwargs. See `pika documentation`_ for more info.
:param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`)
:param kwargs: addition parameters which will be passed to the pika connection.
:return: :class:`aio_pika.connection.Connection`
.. _RFC3986: https://tools.ietf.org/html/rfc3986
.. _pika documentation: https://goo.gl/TdVuZ9
"""

if url:
url = URL(str(url))
host = url.host or host
Expand Down
2 changes: 1 addition & 1 deletion aio_pika/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ class MessageProcessError(AMQPException):
'NoFreeChannels', 'ProbableAccessDeniedError', 'ProtocolSyntaxError', 'ProtocolVersionMismatch',
'RecursionError', 'ShortStringTooLong', 'UnexpectedFrameError', 'UnroutableError',
'UnspportedAMQPFieldException', 'UnsupportedAMQPFieldException',
)
)
13 changes: 13 additions & 0 deletions aio_pika/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class ExchangeType(Enum):


class Exchange(BaseChannel):
""" Exchange abstraction """

__slots__ = 'name', '__type', '__publish_method', 'arguments', 'durable', 'auto_delete', '_channel'

def __init__(self, channel: Channel, publish_method, name: str,
Expand Down Expand Up @@ -45,6 +47,13 @@ def __repr__(self):
@BaseChannel._ensure_channel_is_open
@asyncio.coroutine
def publish(self, message: Message, routing_key, *, mandatory=True, immediate=False):
""" Publish the message to the queue. `aio_pika` use `publisher confirms`_
extension for message delivery.
.. _publisher confirms: https://www.rabbitmq.com/confirms.html
"""

log.debug("Publishing message via exchange %s: %r", self, message)

return (
Expand All @@ -60,6 +69,10 @@ def publish(self, message: Message, routing_key, *, mandatory=True, immediate=Fa

@BaseChannel._ensure_channel_is_open
def delete(self, if_unused=False) -> asyncio.Future:
""" Delete the queue
:param if_unused: perform deletion when queue has no bindings.
"""
log.warning("Deleting %r", self)
self._futures.reject_all(RuntimeError("Exchange was deleted"))
future = create_future(loop=self.loop)
Expand Down
82 changes: 80 additions & 2 deletions aio_pika/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class DeliveryMode(IntEnum):


class Message:
""" AMQP message abstraction """

__slots__ = (
"body", "headers", "content_type", "content_encoding",
"delivery_mode", "priority", "correlation_id", "reply_to",
Expand All @@ -31,6 +33,24 @@ def __init__(self, body: bytes, *, headers: dict = None, content_type: str = Non
type: str = None, user_id: str = None,
app_id: str = None):

""" Creates a new instance of Message
:param body: message body
:param headers: message headers
:param content_type: content type
:param content_encoding: content encoding
:param delivery_mode: delivery mode
:param priority: priority
:param correlation_id: correlation id
:param reply_to: reply to
:param expiration: expiration
:param message_id: message id
:param timestamp: timestamp
:param type: type
:param user_id: user id
:param app_id: app id
"""

self.__lock = False
self.body = body if isinstance(body, bytes) else bytes(body)
self.headers = headers
Expand Down Expand Up @@ -67,6 +87,10 @@ def info(self):

@property
def locked(self):
""" is message locked
:return: :class:`bool`
"""
return self.__lock

@property
Expand Down Expand Up @@ -118,13 +142,41 @@ def __copy__(self):


class IncomingMessage(Message):
""" Incoming message it's seems like Message but has additional methods for message acknowledgement.
Depending on the acknowledgement mode used, RabbitMQ can consider a
message to be successfully delivered either immediately after it is sent
out (written to a TCP socket) or when an explicit ("manual") client
acknowledgement is received. Manually sent acknowledgements can be
positive or negative and use one of the following protocol methods:
* basic.ack is used for positive acknowledgements
* basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)
* basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack
Positive acknowledgements simply instruct RabbitMQ to record a message as delivered.
Negative acknowledgements with basic.reject have the same effect.
The difference is primarily in the semantics: positive acknowledgements assume a message was
successfully processed while their negative counterpart suggests that a delivery wasn't
processed but still should be deleted.
"""
__slots__ = (
'_loop', '__channel', 'cluster_id', 'consumer_tag',
'delivery_tag', 'exchange', 'routing_key', 'synchronous',
'redelivered', '__no_ack', '__processed'
)

def __init__(self, channel: Channel, envelope, properties, body, no_ack: bool = False):
""" Create an instance of :class:`IncomingMessage`
:param channel: :class:`aio_pika.channel.Channel`
:param envelope: pika envelope
:param properties: properties
:param body: message body
:param no_ack: no ack needed
"""
self.__channel = channel
self.__no_ack = no_ack
self.__processed = False
Expand Down Expand Up @@ -155,15 +207,35 @@ def __init__(self, channel: Channel, envelope, properties, body, no_ack: bool =
self.synchronous = envelope.synchronous

@contextmanager
def proccess(self, requeue=False):
def proccess(self, requeue=False, reject_on_redelivered=False):
""" Context manager for processing the message
>>> def on_message_received():
... with message.process():
... # When exception will be raised
... # the message will be rejected
... print(message.body)
:param requeue: Requeue message when exception.
:param reject_on_redelivered: When True message will be rejected only when message was redelivered.
"""
try:
yield self
self.ack()
except:
if reject_on_redelivered and self.redelivered:
log.info("Message %r was redelivered and will be rejected.", self)
self.reject(requeue=False)

self.reject(requeue=requeue)
raise

def ack(self):
""" Send basic.ack is used for positive acknowledgements
:return: None
"""
if self.__no_ack:
log.warning("Can't ack message with \"no_ack\" flag")
return False
Expand All @@ -177,6 +249,10 @@ def ack(self):
self.lock()

def reject(self, requeue=False):
""" When `requeue=True` the message will be returned to queue. Otherwise message will be dropped.
:param requeue: bool
"""
if self.__no_ack:
raise TypeError('This message has "no_ack" flag.')

Expand All @@ -189,6 +265,8 @@ def reject(self, requeue=False):
self.lock()

def info(self):
""" Method returns dict representation of the message """

info = super(IncomingMessage, self).info()
info['cluster_id'] = self.cluster_id
info['consumer_tag'] = self.consumer_tag
Expand All @@ -200,4 +278,4 @@ def info(self):
return info


__all__ = ('Message', 'IncomingMessage')
__all__ = 'Message', 'IncomingMessage',
Loading

0 comments on commit fd5aa90

Please sign in to comment.