Skip to content

Commit

Permalink
Merge pull request #1026 from vitaly-krugl/channel-closed-cleanup
Browse files Browse the repository at this point in the history
Cleaned up "on channel closed" callback and ChannelClosed exception scenarios
  • Loading branch information
vitaly-krugl committed May 4, 2018
2 parents c7fec31 + 761ef5c commit c0a66d2
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 291 deletions.
8 changes: 3 additions & 5 deletions docs/examples/asynchronous_consumer_example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,18 @@ consumer.py::
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.

:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
:param Exception reason: why the channel was closed

"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._connection.close()

def setup_exchange(self, exchange_name):
Expand Down
7 changes: 3 additions & 4 deletions docs/examples/asynchronous_publisher_example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,18 @@ publisher.py::
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.

:param pika.channel.Channel channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
:param Exception reason: why the channel was closed

"""
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
Expand Down
8 changes: 3 additions & 5 deletions docs/examples/asyncio_consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,18 @@ consumer.py::
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.

:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
:param Exception reason: why the channel was closed

"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._connection.close()

def on_channel_open(self, channel):
Expand Down
8 changes: 3 additions & 5 deletions docs/examples/tornado_consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,18 @@ consumer.py::
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.

:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
:param Exception reason: why the channel was closed

"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._connection.close()

def on_channel_open(self, channel):
Expand Down
8 changes: 3 additions & 5 deletions examples/asynchronous_consumer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,20 +141,18 @@ def add_on_channel_close_callback(self):
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
:param Exception reason: why the channel was closed
"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._connection.close()

def setup_exchange(self, exchange_name):
Expand Down
7 changes: 3 additions & 4 deletions examples/asynchronous_publisher_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,18 @@ def add_on_channel_close_callback(self):
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
:param Exception reason: why the channel was closed
"""
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
Expand Down
38 changes: 19 additions & 19 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,9 @@ def __init__(self, channel_impl, connection):
# Basic.Return in publisher-acknowledgments mode.
self._puback_return = None

# self._on_channel_closed() saves the reason exception here
self._closing_reason = None # type: None | Exception

# Receives Basic.ConsumeOk reply from server
self._basic_consume_ok_result = _CallbackResult()

Expand Down Expand Up @@ -1261,10 +1264,8 @@ def __enter__(self):
return self

def __exit__(self, exc_type, value, traceback):
try:
if self.is_open:
self.close()
except exceptions.ChannelClosed:
pass

def _cleanup(self):
"""Clean up members that might inhibit garbage collection"""
Expand Down Expand Up @@ -1336,8 +1337,9 @@ def _flush_output(self, *waiters):
lambda: self.is_closed,
*waiters)

if self.is_closed and self._impl.is_closed_by_broker:
self._impl._raise_if_not_open()
if self.is_closed and isinstance(self._closing_reason,
exceptions.ChannelClosedByBroker):
raise self._closing_reason # pylint: disable=E0702

def _on_puback_message_returned(self, channel, method, properties, body):
"""Called as the result of Basic.Return from broker in
Expand Down Expand Up @@ -1377,7 +1379,7 @@ def _add_pending_event(self, evt):
self._pending_events.append(evt)
self.connection._request_channel_dispatch(self.channel_number)

def _on_channel_closed(self, _channel, reply_code, reply_text):
def _on_channel_closed(self, _channel, reason):
"""Callback from impl notifying us that the channel has been closed.
This may be as the result of user-, broker-, or internal connection
clean-up initiated closing or meta-closing of the channel.
Expand All @@ -1397,16 +1399,14 @@ def _on_channel_closed(self, _channel, reply_code, reply_text):
See `pika.Channel.add_on_close_callback()` for additional documentation.
:param pika.Channel _channel: (unused)
:param int reply_code:
:param str reply_text:
:param Exception reason:
"""
LOGGER.debug('_on_channel_closed: by_broker=%s; (%s) %s; %r',
self._impl.is_closed_by_broker,
reply_code,
reply_text,
self)
LOGGER.debug('_on_channel_closed: %r; %r', reason, self)

self._closing_reason = reason

if self._impl.is_closed_by_broker:
if isinstance(reason, exceptions.ChannelClosedByBroker):
self._cleanup()

# Request urgent termination of `process_data_events()`, in case
Expand Down Expand Up @@ -2038,11 +2038,11 @@ def _process_data_events(self, time_limit):
"""
self.connection.process_data_events(time_limit=time_limit)
if self.is_closed and self._impl.is_closed_by_broker:
LOGGER.debug(
'Channel close by broker detected, raising ChannelClose...; %r',
self)
self._impl._raise_if_not_open()
if self.is_closed and isinstance(self._closing_reason,
exceptions.ChannelClosedByBroker):
LOGGER.debug('Channel close by broker detected, raising %r; %r',
self._closing_reason, self)
raise self._closing_reason # pylint: disable=E0702

def get_waiting_message_count(self):
"""Returns the number of messages that may be retrieved from the current
Expand Down
4 changes: 2 additions & 2 deletions pika/adapters/twisted_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ def __init__(self, channel):

channel.add_on_close_callback(self.channel_closed)

def channel_closed(self, _channel, reply_code, reply_text):
def channel_closed(self, _channel, reason):
# enter the closed state
self.__closed = exceptions.ChannelClosed(reply_code, reply_text)
self.__closed = reason
# errback all pending calls
for d in self.__calls:
d.errback(self.__closed)
Expand Down

0 comments on commit c0a66d2

Please sign in to comment.