Skip to content

Commit

Permalink
Merge d8f861e into dd70de5
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Apr 2, 2017
2 parents dd70de5 + d8f861e commit ba7bffd
Show file tree
Hide file tree
Showing 20 changed files with 1,446 additions and 51 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ instance/

# Sphinx documentation
docs/_build/
docs/source/apidoc

# PyBuilder
target/
Expand Down
26 changes: 14 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
language: python

services:
- rabbitmq
addons:
apt:
sources:
- deadsnakes
packages:
- python3.5
env:
- TOXENV=py34
- TOXENV=py35

language: python

matrix:
include:
- python: 3.4
env: TOXENV=py34
- python: 3.5
env: TOXENV=py35
- python: 3.6
env: TOXENV=py36
install:
- virtualenv -p python3 /tmp/env
- /tmp/env/bin/pip install tox coveralls
- pip install tox coveralls
script:
- /tmp/env/bin/tox
- tox
4 changes: 2 additions & 2 deletions aio_pika/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from .connection import Connection, connect, connect_url
from .channel import Channel
from .exchange import Exchange, ExchangeType
from .message import Message, IncomingMessage
from .message import Message, IncomingMessage, DeliveryMode
from .queue import Queue
from .exceptions import AMQPException, MessageProcessError


__all__ = (
'connect', 'connect_url', 'Connection',
'Channel', 'Exchange', 'Message', 'IncomingMessage', 'Queue',
'AMQPException', 'MessageProcessError', 'ExchangeType'
'AMQPException', 'MessageProcessError', 'ExchangeType', 'DeliveryMode'
)
19 changes: 19 additions & 0 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@


class Channel(BaseChannel):
""" Channel abstraction """

__slots__ = ('__connection', '__closing', '__confirmations', '__delivery_tag',
'loop', '_futures', '__channel', 'default_exchange')

def __init__(self, connection,
loop: asyncio.AbstractEventLoop, future_store: FutureStore):
"""
:param connection: :class:`aio_pika.adapter.AsyncioConnection` instance
:param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`)
:param future_store: :class:`aio_pika.common.FutureStore` instance
"""
super().__init__(loop, future_store.get_child())

self.__channel = None # type: pika.channel.Channel
Expand Down Expand Up @@ -143,6 +150,18 @@ def _publish(self, queue_name, routing_key, body, properties, mandatory, immedia
@asyncio.coroutine
def declare_queue(self, name: str = '', *, durable: bool = None, exclusive: bool = False,
auto_delete: bool = False, arguments: dict = None, timeout: int = None) -> Queue:
"""
:param name: queue name
:param durable: Durability (queue survive broker restart)
:param exclusive: Makes this queue exclusive. Exclusive queues may only be \
accessed by the current connection, and are deleted when that connection \
closes. Passive declaration of an exclusive queue by other connections are not allowed.
:param auto_delete: Delete queue when channel will be closed.
:param arguments: pika additional arguments
:param timeout: execution timeout
:return: :class:`aio_pika.queue.Queue` instance
"""

if auto_delete and durable is None:
durable = False
Expand Down
44 changes: 37 additions & 7 deletions aio_pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import warnings
from functools import wraps
from logging import getLogger
from typing import Callable

from pika import ConnectionParameters
from pika.credentials import PlainCredentials
from pika.spec import REPLY_SUCCESS
Expand All @@ -27,6 +29,8 @@ def wrap(self, *args, **kwargs):


class Connection:
""" Connection abstraction """

__slots__ = (
'loop', '__closing', '_connection', '_futures', '__sender_lock',
'_io_loop', '__connecting', '__connection_parameters', '__credentials',
Expand Down Expand Up @@ -68,11 +72,18 @@ def __repr__(self):
cls_name = self.__class__.__name__
return '<{0}: "{1}">'.format(cls_name, str(self))

def add_close_callback(self, callback):
def add_close_callback(self, callback: Callable[[], None]):
""" Add callback which will be called after connection will be closed.
:return: None
"""

self.__closing.add_done_callback(callback)

@property
def is_closed(self):
""" Is this connection are closed """

if not (self._connection and self._connection.socket):
return True

Expand All @@ -83,10 +94,13 @@ def is_closed(self):

@property
def closing(self):
""" Return future which will be finished after connection close. """
return copy_future(self.__closing)

@asyncio.coroutine
def connect(self):
""" Perform connect. This method should be called after :func:`aio_pika.connection.Connection.__init__`"""

if self.closing.done():
raise RuntimeError("Invalid connection state")

Expand Down Expand Up @@ -139,7 +153,7 @@ def _on_connection_lost(_, code, reason):
@_ensure_connection
@asyncio.coroutine
def channel(self) -> Channel:

""" Get a channel """
log.debug("Creating AMQP channel for conneciton: %r", self)

channel = Channel(self, self.loop, self._futures)
Expand All @@ -151,16 +165,34 @@ def channel(self) -> Channel:

@asyncio.coroutine
def close(self) -> None:
""" Close AMQP connection """
log.debug("Closing AMQP connection")
self._connection.close()
yield from self.closing


@asyncio.coroutine
def connect(url: str=None, *, host: str='localhost',
port: int=5672, login: str='guest',
password: str='guest', virtualhost: str='/',
ssl: bool=False, loop=None, **kwargs) -> Connection:
port: int=5672, login: str='guest',
password: str='guest', virtualhost: str='/',
ssl: bool=False, loop=None, **kwargs) -> Connection:
""" Make connection to the broker
:param url: `RFC3986`_ formatted broker address. When :class:`None` will be used keyword arguments.
:param host: hostname of the broker
:param port: broker port 5672 by default
:param login: username string. `'guest'` by default.
:param password: password string. `'guest'` by default.
:param virtualhost: virtualhost parameter. `'/'` by default
:param ssl: use SSL for connection. Should be used with addition kwargs. See `pika documentation`_ for more info.
:param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`)
:param kwargs: addition parameters which will be passed to the pika connection.
:return: :class:`aio_pika.connection.Connection`
.. _RFC3986: https://tools.ietf.org/html/rfc3986
.. _pika documentation: https://goo.gl/TdVuZ9
"""

if url:
url = URL(str(url))
host = url.host or host
Expand Down Expand Up @@ -188,5 +220,3 @@ def connect_url(url: str, loop=None) -> Connection:


__all__ = ('connect', 'connect_url', 'Connection')


2 changes: 1 addition & 1 deletion aio_pika/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ class MessageProcessError(AMQPException):
'NoFreeChannels', 'ProbableAccessDeniedError', 'ProtocolSyntaxError', 'ProtocolVersionMismatch',
'RecursionError', 'ShortStringTooLong', 'UnexpectedFrameError', 'UnroutableError',
'UnspportedAMQPFieldException', 'UnsupportedAMQPFieldException',
)
)
13 changes: 13 additions & 0 deletions aio_pika/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class ExchangeType(Enum):


class Exchange(BaseChannel):
""" Exchange abstraction """

__slots__ = 'name', '__type', '__publish_method', 'arguments', 'durable', 'auto_delete', '_channel'

def __init__(self, channel: Channel, publish_method, name: str,
Expand Down Expand Up @@ -45,6 +47,13 @@ def __repr__(self):
@BaseChannel._ensure_channel_is_open
@asyncio.coroutine
def publish(self, message: Message, routing_key, *, mandatory=True, immediate=False):
""" Publish the message to the queue. `aio_pika` use `publisher confirms`_
extension for message delivery.
.. _publisher confirms: https://www.rabbitmq.com/confirms.html
"""

log.debug("Publishing message via exchange %s: %r", self, message)

return (
Expand All @@ -60,6 +69,10 @@ def publish(self, message: Message, routing_key, *, mandatory=True, immediate=Fa

@BaseChannel._ensure_channel_is_open
def delete(self, if_unused=False) -> asyncio.Future:
""" Delete the queue
:param if_unused: perform deletion when queue has no bindings.
"""
log.warning("Deleting %r", self)
self._futures.reject_all(RuntimeError("Exchange was deleted"))
future = create_future(loop=self.loop)
Expand Down
Loading

0 comments on commit ba7bffd

Please sign in to comment.