Skip to content

Commit

Permalink
[fix] Raise channel close exception when channel closed
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Feb 17, 2020
1 parent 56bf42c commit c7f0799
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
33 changes: 29 additions & 4 deletions aio_pika/channel.py
Expand Up @@ -18,6 +18,7 @@
from .queue import Queue
from .transaction import Transaction
from .types import ReturnCallbackType, CloseCallbackType, TimeoutType
from .exceptions import ChannelClosed

log = getLogger(__name__)

Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(self, connection, channel_number: int = None,

# noinspection PyTypeChecker
self.default_exchange = None # type: Exchange
self._close_exc = None

@property
def done_callbacks(self) -> CallbackCollection:
Expand All @@ -89,10 +91,7 @@ async def close(self, exc=None):
if self.channel.is_closed:
return

# noinspection PyTypeChecker
channel = self._channel # type: aiormq.Channel
self._channel = ()
await channel.close()
await self._channel.close()

self._done_callbacks(exc)

Expand Down Expand Up @@ -182,6 +181,19 @@ async def initialize(self, timeout: TimeoutType = None) -> None:
)

self.channel.on_return_callbacks.add(self._on_return)
self.channel.closing.add_done_callback(self.__on_channel_close)

def __on_channel_close(self, closing):
self._channel = ()
self._close_exc = closing.exception()

def __ensure_channel_open(self):
if not self.is_closed:
return

raise ChannelClosed(
"Trying to write to already closed channel"
) from self._close_exc

def _on_return(self, message: aiormq.types.DeliveredMessage):
self._return_callbacks(IncomingMessage(message, no_ack=True))
Expand Down Expand Up @@ -209,6 +221,8 @@ async def declare_exchange(
:return: :class:`aio_pika.exchange.Exchange` instance
"""

self.__ensure_channel_open()

if auto_delete and durable is None:
durable = False

Expand Down Expand Up @@ -246,6 +260,7 @@ async def declare_queue(
:param timeout: execution timeout
:return: :class:`aio_pika.queue.Queue` instance
"""
self.__ensure_channel_open()

queue = self.QUEUE_CLASS(
connection=self,
Expand All @@ -267,6 +282,8 @@ async def set_qos(
global_: bool = False, timeout: TimeoutType = None,
all_channels: bool = None
) -> aiormq.spec.Basic.QosOk:
self.__ensure_channel_open()

if all_channels is not None:
warn('Use "global_" instead of "all_channels"', DeprecationWarning)
global_ = all_channels
Expand All @@ -285,6 +302,8 @@ async def queue_delete(
if_unused: bool = False, if_empty: bool = False, nowait: bool = False
) -> aiormq.spec.Queue.DeleteOk:

self.__ensure_channel_open()

return await asyncio.wait_for(
self.channel.queue_delete(
queue=queue_name,
Expand All @@ -300,6 +319,8 @@ async def exchange_delete(
if_unused: bool = False, nowait: bool = False
) -> aiormq.spec.Exchange.DeleteOk:

self.__ensure_channel_open()

return await asyncio.wait_for(
self.channel.exchange_delete(
exchange=exchange_name,
Expand All @@ -314,9 +335,13 @@ def transaction(self) -> Transaction:
raise RuntimeError("Cannot create transaction when publisher "
"confirms are enabled")

self.__ensure_channel_open()

return Transaction(self._channel)

async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk:
self.__ensure_channel_open()

return await self.channel.flow(active=active)


Expand Down
3 changes: 3 additions & 0 deletions tests/test_amqp.py
Expand Up @@ -1104,6 +1104,9 @@ async def test_unexpected_channel_close(self):
await channel.declare_queue("amq.restricted_queue_name",
auto_delete=True)

with pytest.raises(aio_pika.exceptions.ChannelClosed):
await channel.set_qos(100)

await asyncio.wait((client.close(), client.closing))

async def test_declaration_result(self):
Expand Down

0 comments on commit c7f0799

Please sign in to comment.