Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixes issue #279

Also added an integration level test that confirms the fix.
  • Loading branch information...
commit 5ad82582e5a56d032928a4fdef6db4a0bec170b4 1 parent 5a2a785
@atatsu atatsu authored
View
1  .gitignore
@@ -12,3 +12,4 @@ atlassian*xml
build
dist
docs/_build
+*.conf.in
View
2  Makefile
@@ -0,0 +1,2 @@
+integration:
+ nosetests -c integration.cfg
View
3  integration.cfg
@@ -0,0 +1,3 @@
+[nosetests]
+tests=testing/integration
+verbosity=3
View
14 pika/connection.py
@@ -590,7 +590,10 @@ def close(self, reply_code=200, reply_text='Normal shutdown'):
LOGGER.info("Closing connection (%s): %s", reply_code, reply_text)
self.closing = reply_code, reply_text
- self._on_close_ready()
+ if not self._has_open_channels:
+ # if there are open channels then _on_close_ready will finally be
+ # called in _on_channel_closeok once all channels have been closed
+ self._on_close_ready()
def remove_timeout(self, callback_method):
"""Adapters should override to call the callback after the
@@ -781,8 +784,10 @@ def _close_channels(self, reply_code, reply_text):
self._channels[channel_number].close(reply_code, reply_text)
else:
del self._channels[channel_number]
- # Force any lingering callbacks to be removed
- self.callbacks.cleanup(channel_number)
+ # Force any lingering callbacks to be removed
+ # moved inside else block since _on_channel_closeok removes
+ # callbacks
+ self.callbacks.cleanup(channel_number)
else:
self._channels = dict()
@@ -1017,6 +1022,9 @@ def _on_channel_closeok(self, method_frame):
# Force any callbacks to be removed for this channel
self.callbacks.cleanup(method_frame.channel_number)
+ if self.is_closing and not self._has_open_channels:
+ self._on_close_ready()
+
def _on_close_ready(self):
"""Called when the Connection is in a state that it can close after
a close has been requested. This happens, for example, when all of the
View
27 testing/integration/README.md
@@ -0,0 +1,27 @@
+## Pika Integration Tests
+
+Install the necessary packages with `pip install -r testing/integration/requirements.txt`.
+Next copy `testing/integration/broker.conf.in` to `testing/integration/broker.conf` and
+tweak settings to suit. The tests can be run with `make integration` or with
+`nosetests -w testing/integration`.
+
+### Dependencies
+* nose
+* PyYAML
+
+### Default config settings
+
+#### username
+admin
+
+#### password
+secret
+
+#### host
+localhost
+
+#### port
+5672
+
+#### virtual_host
+testing
View
5 testing/integration/broker.conf
@@ -0,0 +1,5 @@
+username: admin
+password: secret
+host: localhost
+port: 5672
+virtual_host: testing
View
16 testing/integration/connection_tests.py
@@ -0,0 +1,16 @@
+from test_base import TestCase
+
+
+class TestConnection(TestCase):
+
+ def test_close_with_channels(self):
+ '''regression test ensuring connection.close doesn't raise an AttributeError (see #279)'''
+ def on_channel_open(channel):
+ # no real good way to handle the exception that's raised, so this
+ # test will either pass or blow up
+ self.stop()
+
+ def on_connected(conn):
+ self.connection.channel(on_channel_open)
+
+ self.start(on_connected)
View
2  testing/integration/requirements.txt
@@ -0,0 +1,2 @@
+nose
+PyYAML
View
107 testing/integration/test_base.py
@@ -0,0 +1,107 @@
+import os
+import sys
+try:
+ from unittest2 import TestCase as BaseTestCase
+except ImportError:
+ from unittest import TestCase as BaseTestCase
+import logging
+
+import pika
+import yaml
+
+config = yaml.load(
+ open(
+ os.path.join(
+ os.path.abspath(os.path.dirname(__file__)),
+ 'broker.conf'
+ )
+ )
+)
+
+
+class TestCase(BaseTestCase):
+
+ timeout = 4
+ pika_log_level = logging.CRITICAL
+
+ def __call__(self, result=None):
+ self._result = result
+ test_method = getattr(self, self._testMethodName)
+ skipped = (
+ getattr(self.__class__, '__unittest_skip__', False) or
+ getattr(test_method, '__unittest_skip__', False)
+ )
+
+ if not skipped:
+ try:
+ self._pre_setup()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception:
+ result.addError(self, sys.exc_info())
+ return
+ super(TestCase, self).__call__(result)
+
+ if not skipped:
+ try:
+ self._post_teardown()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception:
+ result.addError(self, sys.exc_info())
+ return
+
+ def _pre_setup(self):
+ self._timed_out = False
+ logging.getLogger('pika').setLevel(self.pika_log_level)
+ self.credentials = pika.PlainCredentials(config['username'], config['password'])
+ self.parameters = pika.ConnectionParameters(
+ host=config['host'],
+ port=config['port'],
+ virtual_host=config['virtual_host'],
+ credentials=self.credentials,
+ )
+ self._timeout_id = None
+ self.connection = None
+ self.config = config
+
+ def _post_teardown(self):
+ del self.credentials
+ del self.parameters
+ del self.connection
+ del self.config
+ del self._timeout_id
+ del self._timed_out
+
+ def start(self, on_connected):
+ '''connect to rabbitmq and start the ioloop'''
+ self.connection = pika.SelectConnection(
+ self.parameters,
+ on_connected,
+ stop_ioloop_on_close=False,
+ )
+ self._timeout_id = self.connection.add_timeout(self.timeout, self._on_timeout)
+ self.connection.ioloop.start()
+
+ def stop(self):
+ '''close the connection and stop the ioloop'''
+ self.connection.remove_timeout(self._timeout_id)
+ self.connection.add_timeout(4, self._on_close_timeout)
+ self.connection.add_on_close_callback(self._on_closed)
+ self.connection.close()
+
+ def _on_closed(self, frame):
+ '''called when the connection has finished closing'''
+ self.connection.ioloop.stop()
+ if self._timed_out:
+ raise AssertionError('Timed out. Did you call `stop`?')
+
+ def _on_close_timeout(self):
+ '''called when stuck waiting for connection to close'''
+ # force the ioloop to stop
+ self.connection.ioloop.stop()
+ raise AssertionError('Timed out waiting for connection to close')
+
+ def _on_timeout(self):
+ self._timed_out = True
+ self.stop()
View
32 tests/connection_tests.py
@@ -31,7 +31,33 @@ def test_close_closes_open_channels(self, send_connection_close):
self.connection.close()
self.channel.close.assert_called_once_with(200, 'Normal shutdown')
- @mock.patch('pika.connection.Connection._send_connection_close')
- def test_close_sent(self, send_connection_close):
+ @mock.patch('pika.connection.Connection._on_close_ready')
+ def test_on_close_ready_open_channels(self, on_close_ready):
+ '''if open channels _on_close_ready shouldn't be called'''
+ self.connection.close()
+ self.assertFalse(on_close_ready.called, '_on_close_ready should not have been called')
+
+ @mock.patch('pika.connection.Connection._on_close_ready')
+ def test_on_close_ready_no_open_channels(self, on_close_ready):
+ self.connection._channels = dict()
+ self.connection.close()
+ self.assertTrue(on_close_ready.called, '_on_close_ready should have been called')
+
+ @mock.patch('pika.connection.Connection._on_close_ready')
+ def test_on_channel_closeok_no_open_channels(self, on_close_ready):
+ '''should call _on_close_ready if connection is closing and there are no open channels'''
+ self.connection._channels = dict()
+ self.connection.close()
+ self.assertTrue(on_close_ready.called, '_on_close_ready should been called')
+
+ @mock.patch('pika.connection.Connection._on_close_ready')
+ def test_on_channel_closeok_open_channels(self, on_close_ready):
+ '''if connection is closing but channels remain open do not call _on_close_ready'''
self.connection.close()
- send_connection_close.assert_called_once_with(200, 'Normal shutdown')
+ self.assertFalse(on_close_ready.called, '_on_close_ready should not have been called')
+
+ @mock.patch('pika.connection.Connection._on_close_ready')
+ def test_on_channel_closeok_non_closing_state(self, on_close_ready):
+ '''if connection isn't closing _on_close_ready should not be called'''
+ self.connection._on_channel_closeok(mock.Mock())
+ self.assertFalse(on_close_ready.called, '_on_close_ready should not have been called')
Please sign in to comment.
Something went wrong with that request. Please try again.