Skip to content

Commit

Permalink
Merge pull request #533 from mosquito/featire/wait-reopen-channel-state
Browse files Browse the repository at this point in the history
Feature/wait reopen channel state
  • Loading branch information
mosquito committed Jun 1, 2023
2 parents 6d4c8a9 + 87617bf commit a19671d
Show file tree
Hide file tree
Showing 31 changed files with 634 additions and 397 deletions.
20 changes: 17 additions & 3 deletions CHANGELOG.md
@@ -1,3 +1,17 @@
9.1.0
-----

The bulk of the changes are related to how the library entities are now
interconnected. In previous versions of `aio_pika.Channel` instances not
contains a link to the `aio_pika.Connection` instances for now is contains it.

While I don't want custom code to work directly with the `aiormq.Channel`
instance, this was a public API and I should warn you about the change here.
The `aio_pika.Channel.channel` property is deprecated. Use
`aio_pika.Channel.get_underlay_chanel()` instead.
Now all library entities already use this method.


9.0.7
-----

Expand All @@ -17,9 +31,9 @@
-----

* Prevent 'Task exception was never retrieved' #524
If future.exception() is not called (even on cancelled futures), it seems Python
will then log 'Task exception was never retrieved'. Rewriting this logic
slightly should hopefully achieve the same functionality while
If future.exception() is not called (even on cancelled futures), it seems Python
will then log 'Task exception was never retrieved'. Rewriting this logic
slightly should hopefully achieve the same functionality while
preventing the Python errors.
* Avoid implicitly depending on setuptools #526

Expand Down
5 changes: 2 additions & 3 deletions Makefile
@@ -1,17 +1,16 @@
all: test

RABBITMQ_CONTAINER_NAME:=aio_pika_rabbitmq
RABBITMQ_IMAGE:=mosquito/aiormq-rabbitmq

test:
find . -name "*.pyc" -type f -delete
tox

rabbitmq:
docker kill $(docker ps -f label=aio-pika.rabbitmq -q) || true
docker pull $(RABBITMQ_IMAGE)
docker kill $(RABBITMQ_CONTAINER_NAME) || true
docker run --rm -d \
--name $(RABBITMQ_CONTAINER_NAME) \
-l aio-pika.rabbitmq \
-p 5671:5671 \
-p 5672:5672 \
-p 15671:15671 \
Expand Down
21 changes: 8 additions & 13 deletions aio_pika/abc.py
Expand Up @@ -11,6 +11,7 @@
Generator, Iterator, Optional, Type, TypeVar, Union, overload,
)


if sys.version_info >= (3, 8):
from typing import Literal, TypedDict
else:
Expand Down Expand Up @@ -81,11 +82,6 @@ class DeclarationResult:
class AbstractTransaction:
state: TransactionState

@property
@abstractmethod
def channel(self) -> "AbstractChannel":
raise NotImplementedError

@abstractmethod
async def select(
self, timeout: TimeoutType = None,
Expand Down Expand Up @@ -244,7 +240,7 @@ async def __aexit__(


class AbstractQueue:
channel: aiormq.abc.AbstractChannel
channel: "AbstractChannel"
name: str
durable: bool
exclusive: bool
Expand Down Expand Up @@ -307,7 +303,7 @@ async def unbind(
@abstractmethod
async def consume(
self,
callback: Callable[[AbstractIncomingMessage], Any],
callback: Callable[[AbstractIncomingMessage], Awaitable[Any]],
no_ack: bool = False,
exclusive: bool = False,
arguments: Arguments = None,
Expand Down Expand Up @@ -409,7 +405,7 @@ class AbstractExchange(ABC):
@abstractmethod
def __init__(
self,
channel: aiormq.abc.AbstractChannel,
channel: "AbstractChannel",
name: str,
type: Union[ExchangeType, str] = ExchangeType.DIRECT,
*,
Expand Down Expand Up @@ -528,9 +524,8 @@ def is_closed(self) -> bool:
def close(self, exc: Optional[ExceptionType] = None) -> Awaitable[None]:
raise NotImplementedError

@property
@abstractmethod
def channel(self) -> aiormq.abc.AbstractChannel:
async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel:
raise NotImplementedError

@property
Expand Down Expand Up @@ -760,7 +755,7 @@ async def update_secret(

class AbstractRobustQueue(AbstractQueue):
@abstractmethod
def restore(self, channel: aiormq.abc.AbstractChannel) -> Awaitable[None]:
def restore(self) -> Awaitable[None]:
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -791,7 +786,7 @@ async def consume(

class AbstractRobustExchange(AbstractExchange):
@abstractmethod
def restore(self, channel: aiormq.abc.AbstractChannel) -> Awaitable[None]:
def restore(self) -> Awaitable[None]:
raise NotImplementedError

@abstractmethod
Expand All @@ -815,7 +810,7 @@ def reopen(self) -> Awaitable[None]:
raise NotImplementedError

@abstractmethod
async def restore(self, connection: aiormq.abc.AbstractConnection) -> None:
async def restore(self) -> None:
raise NotImplementedError

@abstractmethod
Expand Down
84 changes: 53 additions & 31 deletions aio_pika/channel.py
@@ -1,4 +1,5 @@
import asyncio
import warnings
from abc import ABC
from types import TracebackType
from typing import Any, AsyncContextManager, Generator, Optional, Type, Union
Expand All @@ -9,9 +10,10 @@
from pamqp.common import Arguments

from .abc import (
AbstractChannel, AbstractExchange, AbstractQueue, TimeoutType,
UnderlayChannel,
AbstractChannel, AbstractConnection, AbstractExchange, AbstractQueue,
TimeoutType, UnderlayChannel,
)
from .exceptions import ChannelInvalidStateError
from .exchange import Exchange, ExchangeType
from .log import get_logger
from .message import IncomingMessage
Expand Down Expand Up @@ -52,7 +54,7 @@ class Channel(ChannelContext):

def __init__(
self,
connection: aiormq.abc.AbstractConnection,
connection: AbstractConnection,
channel_number: Optional[int] = None,
publisher_confirms: bool = True,
on_return_raises: bool = False,
Expand All @@ -73,12 +75,12 @@ def __init__(
'without "publisher_confirms"',
)

self._connection: aiormq.abc.AbstractConnection = connection
self._connection: AbstractConnection = connection

# That's means user closed channel instance explicitly
self._closed: bool = False

self._channel = None
self._channel: Optional[UnderlayChannel] = None
self._channel_number = channel_number

self.close_callbacks = CallbackCollection(self)
Expand All @@ -99,9 +101,10 @@ def is_closed(self) -> bool:
side or after the close() method has been called."""
if not self.is_initialized or self._closed:
return True
if not self._channel:
channel = self._channel
if channel is None:
return True
return self._channel.channel.is_closed
return channel.channel.is_closed

async def close(
self,
Expand All @@ -119,45 +122,58 @@ async def close(
self._closed = True
await self._channel.close()

@property
def channel(self) -> aiormq.abc.AbstractChannel:
async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel:

if not self.is_initialized or not self._channel:
raise aiormq.exceptions.ChannelInvalidStateError(
"Channel was not opened",
)

if self.is_closed:
raise aiormq.exceptions.ChannelInvalidStateError(
"Channel has been closed",
)
return self._channel.channel

@property
def channel(self) -> aiormq.abc.AbstractChannel:
warnings.warn(
"This property is deprecated, do not use this anymore.",
DeprecationWarning,
)
if self._channel is None:
raise aiormq.exceptions.ChannelInvalidStateError
return self._channel.channel

@property
def number(self) -> Optional[int]:
return (
self.channel.number
if self.is_initialized
else self._channel_number
)
if self._channel is None:
return self._channel_number

underlay_channel: UnderlayChannel = self._channel
return underlay_channel.channel.number

def __str__(self) -> str:
return "{}".format(self.number or "Not initialized channel")

async def _open(self) -> None:
await self._connection.ready()

transport = self._connection.transport
if transport is None:
raise ChannelInvalidStateError("No active transport in channel")

channel = await UnderlayChannel.create(
self._connection,
transport.connection,
self._on_close,
publisher_confirms=self.publisher_confirms,
on_return_raises=self.on_return_raises,
channel_number=self._channel_number,
)

await self._on_open(channel.channel)
self._channel = channel
try:
await self._on_open()
except BaseException as e:
await channel.close(e)
self._channel = None
raise
self._closed = False

async def initialize(self, timeout: TimeoutType = None) -> None:
Expand All @@ -169,9 +185,9 @@ async def initialize(self, timeout: TimeoutType = None) -> None:
await self._open()
await self._on_initialized()

async def _on_open(self, channel: aiormq.abc.AbstractChannel) -> None:
async def _on_open(self) -> None:
self.default_exchange: Exchange = self.EXCHANGE_CLASS(
channel=channel,
channel=self,
arguments=None,
auto_delete=False,
durable=False,
Expand All @@ -192,7 +208,8 @@ async def _on_close(self, closing: asyncio.Future) -> None:
self._channel.channel.on_return_callbacks.discard(self._on_return)

async def _on_initialized(self) -> None:
self.channel.on_return_callbacks.add(self._on_return)
channel = await self.get_underlay_channel()
channel.on_return_callbacks.add(self._on_return)

def _on_return(self, message: aiormq.abc.DeliveredMessage) -> None:
self.return_callbacks(IncomingMessage(message, no_ack=True))
Expand Down Expand Up @@ -241,7 +258,7 @@ async def declare_exchange(
durable = False

exchange = self.EXCHANGE_CLASS(
channel=self.channel,
channel=self,
name=name,
type=type,
durable=durable,
Expand Down Expand Up @@ -281,7 +298,7 @@ async def get_exchange(
return await self.declare_exchange(name=name, passive=True)
else:
return self.EXCHANGE_CLASS(
channel=self.channel,
channel=self,
name=name,
durable=False,
auto_delete=False,
Expand Down Expand Up @@ -321,7 +338,7 @@ async def declare_queue(
"""

queue: AbstractQueue = self.QUEUE_CLASS(
channel=self.channel,
channel=self,
name=name,
durable=durable,
exclusive=exclusive,
Expand Down Expand Up @@ -358,7 +375,7 @@ async def get_queue(
return await self.declare_queue(name=name, passive=True)
else:
return self.QUEUE_CLASS(
channel=self.channel,
channel=self,
name=name,
durable=False,
exclusive=False,
Expand All @@ -379,7 +396,9 @@ async def set_qos(
warn('Use "global_" instead of "all_channels"', DeprecationWarning)
global_ = all_channels

return await self.channel.basic_qos(
channel = await self.get_underlay_channel()

return await channel.basic_qos(
prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_,
Expand All @@ -394,7 +413,8 @@ async def queue_delete(
if_empty: bool = False,
nowait: bool = False,
) -> aiormq.spec.Queue.DeleteOk:
return await self.channel.queue_delete(
channel = await self.get_underlay_channel()
return await channel.queue_delete(
queue=queue_name,
if_unused=if_unused,
if_empty=if_empty,
Expand All @@ -409,7 +429,8 @@ async def exchange_delete(
if_unused: bool = False,
nowait: bool = False,
) -> aiormq.spec.Exchange.DeleteOk:
return await self.channel.exchange_delete(
channel = await self.get_underlay_channel()
return await channel.exchange_delete(
exchange=exchange_name,
if_unused=if_unused,
nowait=nowait,
Expand All @@ -426,7 +447,8 @@ def transaction(self) -> Transaction:
return Transaction(self)

async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk:
return await self.channel.flow(active=active)
channel = await self.get_underlay_channel()
return await channel.flow(active=active)


__all__ = ("Channel",)

0 comments on commit a19671d

Please sign in to comment.