From c7f07998f460dfa5b0a03f1f25cd73085350942f Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Mon, 17 Feb 2020 13:29:34 +0300 Subject: [PATCH] [fix] Raise channel close exception when channel closed --- aio_pika/channel.py | 33 +++++++++++++++++++++++++++++---- tests/test_amqp.py | 3 +++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/aio_pika/channel.py b/aio_pika/channel.py index c7175786..0e12e9c5 100644 --- a/aio_pika/channel.py +++ b/aio_pika/channel.py @@ -18,6 +18,7 @@ from .queue import Queue from .transaction import Transaction from .types import ReturnCallbackType, CloseCallbackType, TimeoutType +from .exceptions import ChannelClosed log = getLogger(__name__) @@ -66,6 +67,7 @@ def __init__(self, connection, channel_number: int = None, # noinspection PyTypeChecker self.default_exchange = None # type: Exchange + self._close_exc = None @property def done_callbacks(self) -> CallbackCollection: @@ -89,10 +91,7 @@ async def close(self, exc=None): if self.channel.is_closed: return - # noinspection PyTypeChecker - channel = self._channel # type: aiormq.Channel - self._channel = () - await channel.close() + await self._channel.close() self._done_callbacks(exc) @@ -182,6 +181,19 @@ async def initialize(self, timeout: TimeoutType = None) -> None: ) self.channel.on_return_callbacks.add(self._on_return) + self.channel.closing.add_done_callback(self.__on_channel_close) + + def __on_channel_close(self, closing): + self._channel = () + self._close_exc = closing.exception() + + def __ensure_channel_open(self): + if not self.is_closed: + return + + raise ChannelClosed( + "Trying to write to already closed channel" + ) from self._close_exc def _on_return(self, message: aiormq.types.DeliveredMessage): self._return_callbacks(IncomingMessage(message, no_ack=True)) @@ -209,6 +221,8 @@ async def declare_exchange( :return: :class:`aio_pika.exchange.Exchange` instance """ + self.__ensure_channel_open() + if auto_delete and durable is None: durable = False @@ -246,6 +260,7 @@ async def declare_queue( :param timeout: execution timeout :return: :class:`aio_pika.queue.Queue` instance """ + self.__ensure_channel_open() queue = self.QUEUE_CLASS( connection=self, @@ -267,6 +282,8 @@ async def set_qos( global_: bool = False, timeout: TimeoutType = None, all_channels: bool = None ) -> aiormq.spec.Basic.QosOk: + self.__ensure_channel_open() + if all_channels is not None: warn('Use "global_" instead of "all_channels"', DeprecationWarning) global_ = all_channels @@ -285,6 +302,8 @@ async def queue_delete( if_unused: bool = False, if_empty: bool = False, nowait: bool = False ) -> aiormq.spec.Queue.DeleteOk: + self.__ensure_channel_open() + return await asyncio.wait_for( self.channel.queue_delete( queue=queue_name, @@ -300,6 +319,8 @@ async def exchange_delete( if_unused: bool = False, nowait: bool = False ) -> aiormq.spec.Exchange.DeleteOk: + self.__ensure_channel_open() + return await asyncio.wait_for( self.channel.exchange_delete( exchange=exchange_name, @@ -314,9 +335,13 @@ def transaction(self) -> Transaction: raise RuntimeError("Cannot create transaction when publisher " "confirms are enabled") + self.__ensure_channel_open() + return Transaction(self._channel) async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk: + self.__ensure_channel_open() + return await self.channel.flow(active=active) diff --git a/tests/test_amqp.py b/tests/test_amqp.py index e4f44828..6a35d124 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -1104,6 +1104,9 @@ async def test_unexpected_channel_close(self): await channel.declare_queue("amq.restricted_queue_name", auto_delete=True) + with pytest.raises(aio_pika.exceptions.ChannelClosed): + await channel.set_qos(100) + await asyncio.wait((client.close(), client.closing)) async def test_declaration_result(self):