Skip to content

Commit

Permalink
[fix] add colient info and some internal api changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Jul 5, 2017
1 parent 9ccfb11 commit 94522cb
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 29 deletions.
21 changes: 21 additions & 0 deletions aio_pika/adapter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import logging
import platform
from functools import partial

from pika.adapters import base_connection
from .version import __version__


LOGGER = logging.getLogger(__name__)

PRODUCT = 'aio-pika'


class IOLoopAdapter:
__slots__ = 'loop', 'handlers', 'readers', 'writers'
Expand Down Expand Up @@ -149,3 +153,20 @@ def _handle_disconnect(self):
super()._handle_write()
except Exception as e:
self._on_disconnect(-1, e)

@property
def _client_properties(self) -> dict:
""" Return the client properties dictionary. """
return {
'product': PRODUCT,
'platform': 'Python %s' % platform.python_version(),
'capabilities': {
'authentication_failure_close': True,
'basic.nack': True,
'connection.blocked': True,
'consumer_cancel_notify': True,
'publisher_confirms': True
},
'information': 'See https://aio-pika.readthedocs.io/',
'version': __version__
}
39 changes: 25 additions & 14 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ def __init__(self, connection,
future_store=self._futures.get_child(),
)

@property
def _channel_maker(self):
return self.__connection._connection.channel

@property
def number(self):
return self.__channel.channel_number

def __str__(self):
return "{0}".format(self.__channel.channel_number if self.__channel else "Not initialized chanel")
return "{0}".format(self.number if self.__channel else "Not initialized chanel")

def __repr__(self):
return '<Channel "%s#%s">' % (self.__connection, self)
Expand Down Expand Up @@ -84,30 +92,33 @@ def add_close_callback(self, callback: FunctionType) -> None:
def add_on_return_callback(self, callback: FunctionOrCoroutine) -> None:
self.__on_return_callbacks.add(callback)

@asyncio.coroutine
def _create_channel(self, timeout=None):
future = self._create_future(timeout=timeout)

self._channel_maker(future.set_result)

channel = yield from future # type: pika.channel.Channel
channel.confirm_delivery(self._on_delivery_confirmation)
channel.add_on_close_callback(self._on_channel_close)
channel.add_on_return_callback(self._on_return)

return channel

@asyncio.coroutine
def initialize(self, timeout=None) -> None:
with (yield from self.__write_lock):
if self._closing.done():
raise RuntimeError("Can't initialize closed channel")

future = self._create_future(timeout=timeout)

self.__connection._connection.channel(future.set_result)

channel = yield from future # type: pika.channel.Channel
channel.confirm_delivery(self._on_delivery_confirmation)
channel.add_on_close_callback(self._on_channel_close)
channel.add_on_return_callback(self._on_return)

self.__channel = channel
self.__channel = yield from self._create_channel(timeout)

def _on_delivery_confirmation(self, method_frame):
future = self.__confirmations.pop(method_frame.method.delivery_tag, None)

if not future:
log.info(
"Unknown delivery tag %d for message confirmation \"%s\"",
method_frame.method.delivery_tag, method_frame.method.NAME)
log.info("Unknown delivery tag %d for message confirmation \"%s\"",
method_frame.method.delivery_tag, method_frame.method.NAME)
return

try:
Expand Down
31 changes: 17 additions & 14 deletions aio_pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,31 @@

def _ensure_connection(func):
@wraps(func)
@asyncio.coroutine
def wrap(self, *args, **kwargs):
if self.is_closed:
raise RuntimeError("Connection closed")

return (yield from func(self, *args, **kwargs))
return func(self, *args, **kwargs)
return wrap


class Connection:
""" Connection abstraction """

__slots__ = (
'loop', '__closing', '_connection', '_futures', '__sender_lock',
'loop', '__closing', '_connection', 'future_store', '__sender_lock',
'_io_loop', '__connection_parameters', '__credentials',
'__write_lock'
)

CHANNEL_CLASS = Channel

def __init__(self, host: str = 'localhost', port: int = 5672, login: str = 'guest',
password: str = 'guest', virtual_host: str = '/',
ssl: bool = False, *, loop=None, **kwargs):

self.loop = loop if loop else asyncio.get_event_loop()
self._futures = FutureStore(loop=self.loop)
self.future_store = FutureStore(loop=self.loop)

self.__credentials = PlainCredentials(login, password) if login else None

Expand All @@ -52,7 +53,7 @@ def __init__(self, host: str = 'localhost', port: int = 5672, login: str = 'gues
credentials=self.__credentials,
virtual_host=virtual_host,
ssl=ssl,
**kwargs
**kwargs,
)

self._connection = None
Expand Down Expand Up @@ -93,11 +94,13 @@ def is_closed(self):

@property
def _closing(self):
if self.__closing is None:
self.__closing = self._futures.create_future()

self._create_closing_future()
return self.__closing

def _create_closing_future(self, force=False):
if self.__closing is None or force:
self.__closing = self.future_store.create_future()

@property
def closing(self):
""" Return future which will be finished after connection close. """
Expand Down Expand Up @@ -134,7 +137,7 @@ def _on_connection_lost(_, code, reason):
else:
exc = ConnectionError(reason, code)

self._futures.reject_all(exc)
self.future_store.reject_all(exc)

if f.done():
return
Expand Down Expand Up @@ -162,8 +165,7 @@ def channel(self) -> Channel:
with (yield from self.__write_lock):
log.debug("Creating AMQP channel for conneciton: %r", self)

channel = Channel(self, self.loop, self._futures)

channel = self.CHANNEL_CLASS(self, self.loop, self.future_store)
yield from channel.initialize()

log.debug("Channel created: %r", channel)
Expand All @@ -181,7 +183,7 @@ def close(self) -> None:
def connect(url: str=None, *, host: str='localhost',
port: int=5672, login: str='guest',
password: str='guest', virtualhost: str='/',
ssl: bool=False, loop=None, **kwargs) -> Connection:
ssl: bool=False, loop=None, connection_class=Connection, **kwargs) -> Connection:
""" Make connection to the broker
:param url: `RFC3986`_ formatted broker address. When :class:`None` will be used keyword arguments.
Expand All @@ -192,6 +194,7 @@ def connect(url: str=None, *, host: str='localhost',
:param virtualhost: virtualhost parameter. `'/'` by default
:param ssl: use SSL for connection. Should be used with addition kwargs. See `pika documentation`_ for more info.
:param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`)
:param connection_class: Factory of a new connection
:param kwargs: addition parameters which will be passed to the pika connection.
:return: :class:`aio_pika.connection.Connection`
Expand All @@ -207,7 +210,7 @@ def connect(url: str=None, *, host: str='localhost',
password = url.password or password
virtualhost = url.path[1:] if url.path else virtualhost

connection = Connection(
connection = connection_class(
host=host, port=port, login=login, password=password,
virtual_host=virtualhost, ssl=ssl, loop=loop, **kwargs
)
Expand All @@ -219,7 +222,7 @@ def connect(url: str=None, *, host: str='localhost',
@asyncio.coroutine
def connect_url(url: str, loop=None) -> Connection:
warnings.warn(
'Please use reconnect(url) instead connect_url(url)', DeprecationWarning
'Please use connect(url) instead connect_url(url)', DeprecationWarning
)

return (yield from connect(url, loop=loop))
Expand Down
2 changes: 1 addition & 1 deletion aio_pika/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

team_email = 'me@mosquito.su'

version_info = (0, 12, 1)
version_info = (0, 12, 2)

__author__ = ", ".join("{} <{}>".format(*info) for info in author_info)
__version__ = ".".join(map(str, version_info))

0 comments on commit 94522cb

Please sign in to comment.