Skip to content

Commit

Permalink
Merge pull request #12 from dave-shawley/fix-amqp-close
Browse files Browse the repository at this point in the history
Fix amqp close
  • Loading branch information
gmr committed May 2, 2017
2 parents e6dad15 + 8547586 commit b4c02aa
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Expand Up @@ -8,9 +8,9 @@ python:
services:
- rabbitmq
install:
- pip install -r requires/testing.txt -r docs/requirements.txt
- pip install -r requires/testing.txt -r docs/requirements.txt codecov
script:
- nosetests
- nosetests --with-coverage
- ./setup.py build_sphinx
after_success:
- codecov
Expand All @@ -22,4 +22,5 @@ deploy:
on:
python: 3.4
tags: true
all_branches: true
repo: sprockets/sprockets.mixins.amqp
4 changes: 4 additions & 0 deletions docs/history.rst
@@ -1,6 +1,10 @@
Version History
===============

`Next Release`_
---------------
- Fix intentional closing of an AMQP connection.

`2.0.0`_ Apr 24, 2017
---------------------
- Move Mixin and AMQP client to separate files
Expand Down
26 changes: 16 additions & 10 deletions sprockets/mixins/amqp/amqp.py
Expand Up @@ -270,8 +270,6 @@ def close(self):
LOGGER.error('Closed called while not connected (%s)', self._state)
return
self._state = self.STATE_CLOSING
if self._on_unavailable:
self._on_unavailable(self)
LOGGER.info('Closing RabbitMQ connection')
self._connection.close()

Expand Down Expand Up @@ -357,14 +355,18 @@ def _on_connection_closed(self, _connection, reply_code, reply_text):
:param str reply_text: The server provided reply_text if given
"""
should_reconnect = self._state != self.STATE_CLOSING
start_state = self._state
self._connection = None
self._channel = None
self._state = self.STATE_CLOSED
if self._on_unavailable:
self._on_unavailable(self)
if self._state != self.STATE_CLOSING:
LOGGER.warning('Connection to RabbitMQ closed (%s): %s',
reply_code, reply_text)
LOGGER.info('Connection to RabbitMQ closed (%s): %s',
reply_code, reply_text)
if should_reconnect:
LOGGER.info('Reconnecting in state %d, started in %d',
self._state, start_state)
self._reconnect()

"""
Expand Down Expand Up @@ -413,11 +415,15 @@ def _on_channel_closed(self, channel, reply_code, reply_text):
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
if self._on_unavailable:
self._on_unavailable(self)
self._channel = self._open_channel()
if self._state == self.STATE_CLOSING:
LOGGER.info('Channel %i was intentionally closed (%s) %s',
channel, reply_code, reply_text)
else:
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
if self._on_unavailable:
self._on_unavailable(self)
self._channel = self._open_channel()

def _on_channel_flow(self, method):
"""When RabbitMQ indicates the connection is unblocked, set the state
Expand Down
25 changes: 25 additions & 0 deletions tests.py
Expand Up @@ -542,3 +542,28 @@ def should_publish_message_via_handler_test(self):
result['properties'].correlation_id)
self.assertEqual(properties['content_type'],
result['properties'].content_type)


class AMQPConnectionTests(testing.AsyncTestCase):

def setUp(self):
super(AMQPConnectionTests, self).setUp()
self.event = locks.Event()
self.amqp = amqp.AMQP(AMQP_URL, io_loop=self.io_loop,
on_ready_callback=self.set_event,
on_unavailable_callback=self.set_event)
self.io_loop.add_future(self.event.wait(), lambda _: self.io_loop.stop())
self.io_loop.start()

def set_event(self, *args):
self.event.set()

@testing.gen_test
def test_that_connection_can_be_closed(self):
self.assertTrue(self.amqp.connected)
self.event.clear()
self.amqp.close()
yield self.event.wait()
self.assertFalse(self.amqp.connected)
self.assertEqual(self.amqp._state, self.amqp.STATE_CLOSED)
self.assertTrue(self.amqp.closed)

0 comments on commit b4c02aa

Please sign in to comment.