From a75f693c70dc70b92e4f3b51c3d042a68510e29f Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Thu, 17 Apr 2014 20:46:34 -0400 Subject: [PATCH] Update test structure - Move unit tests to subdir of tests - Move integration tests to acceptance tests - Create base async acceptance test classes - Add baseline acceptance tests for aysncore, select and tornado adapters - Remove Makefile, integration.cfg and other related items from older integration test suite --- Makefile | 2 - integration.cfg | 3 - tests/acceptance/async_test_base.py | 127 +++++++ tests/acceptance/asyncore_adapter_tests.py | 355 ++++++++++++++++++ tests/acceptance/select_adapter_tests.py | 355 ++++++++++++++++++ tests/acceptance/tornado_adapter_tests.py | 345 +++++++++++++++++ tests/integration/README.md | 27 -- tests/integration/broker.conf | 5 - tests/integration/connection_tests.py | 19 - tests/integration/requirements.txt | 2 - tests/integration/test_base.py | 104 ----- tests/{ => unit}/amqp_object_tests.py | 0 tests/{ => unit}/base_connection_tests.py | 0 tests/{ => unit}/blocking_channel_tests.py | 0 tests/{ => unit}/callback_tests.py | 0 tests/{ => unit}/channel_tests.py | 0 tests/{ => unit}/connection_tests.py | 0 tests/{ => unit}/connection_timeout_tests.py | 0 .../content_frame_dispatcher_tests.py | 0 tests/{ => unit}/credentials_tests.py | 0 tests/{ => unit}/data_tests.py | 0 tests/{ => unit}/exceptions_test.py | 0 tests/{ => unit}/frame_tests.py | 0 tests/{ => unit}/heartbeat_tests.py | 0 tests/{ => unit}/parameter_tests.py | 0 tests/{ => unit}/tornado_tests.py | 0 tests/{ => unit}/utils_tests.py | 0 27 files changed, 1182 insertions(+), 162 deletions(-) delete mode 100644 Makefile delete mode 100644 integration.cfg create mode 100644 tests/acceptance/async_test_base.py create mode 100644 tests/acceptance/asyncore_adapter_tests.py create mode 100644 tests/acceptance/select_adapter_tests.py create mode 100644 tests/acceptance/tornado_adapter_tests.py delete mode 100644 tests/integration/README.md delete mode 100644 tests/integration/broker.conf delete mode 100644 tests/integration/connection_tests.py delete mode 100644 tests/integration/requirements.txt delete mode 100644 tests/integration/test_base.py rename tests/{ => unit}/amqp_object_tests.py (100%) rename tests/{ => unit}/base_connection_tests.py (100%) rename tests/{ => unit}/blocking_channel_tests.py (100%) rename tests/{ => unit}/callback_tests.py (100%) rename tests/{ => unit}/channel_tests.py (100%) rename tests/{ => unit}/connection_tests.py (100%) rename tests/{ => unit}/connection_timeout_tests.py (100%) rename tests/{ => unit}/content_frame_dispatcher_tests.py (100%) rename tests/{ => unit}/credentials_tests.py (100%) rename tests/{ => unit}/data_tests.py (100%) rename tests/{ => unit}/exceptions_test.py (100%) rename tests/{ => unit}/frame_tests.py (100%) rename tests/{ => unit}/heartbeat_tests.py (100%) rename tests/{ => unit}/parameter_tests.py (100%) rename tests/{ => unit}/tornado_tests.py (100%) rename tests/{ => unit}/utils_tests.py (100%) diff --git a/Makefile b/Makefile deleted file mode 100644 index 917b460f1..000000000 --- a/Makefile +++ /dev/null @@ -1,2 +0,0 @@ -integration: - nosetests -c integration.cfg diff --git a/integration.cfg b/integration.cfg deleted file mode 100644 index db31607c3..000000000 --- a/integration.cfg +++ /dev/null @@ -1,3 +0,0 @@ -[nosetests] -tests=tests/integration -verbosity=3 diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py new file mode 100644 index 000000000..4b8d9d044 --- /dev/null +++ b/tests/acceptance/async_test_base.py @@ -0,0 +1,127 @@ +import logging +try: + import unittest2 as unittest +except ImportError: + import unittest + +import pika + +LOGGER = logging.getLogger(__name__) +PARAMETERS = pika.URLParameters('amqp://guest:guest@localhost:5672/%2f') +DEFAULT_TIMEOUT = 5 + + +class AsyncTestCase(unittest.TestCase): + + ADAPTER = None + TIMEOUT = DEFAULT_TIMEOUT + + def begin(self, channel): + """Extend to start the actual tests on the channel""" + raise AssertionError("AsyncTestCase.begin_test not extended") + + def start(self): + self.connection = self.ADAPTER(PARAMETERS, + self.on_open, + self.on_open_error, + self.on_closed) + self.timeout = self.connection.add_timeout(self.TIMEOUT, + self.on_timeout) + self.connection.ioloop.start() + + def stop(self): + """close the connection and stop the ioloop""" + LOGGER.info("Stopping test") + self.connection.remove_timeout(self.timeout) + self.timeout = None + self.connection.close() + + def _stop(self): + if hasattr(self, 'timeout') and self.timeout: + self.connection.remove_timeout(self.timeout) + self.timeout = None + if hasattr(self, 'connection') and self.connection: + self.connection.ioloop.stop() + self.connection = None + + def tearDown(self): + self._stop() + + def on_closed(self, connection, reply_code, reply_text): + """called when the connection has finished closing""" + LOGGER.debug("Connection Closed") + self._stop() + + def on_open(self, connection): + self.channel = connection.channel(self.begin) + + def on_open_error(self, connection): + connection.ioloop.stop() + raise AssertionError('Error connecting to RabbitMQ') + + def on_timeout(self): + """called when stuck waiting for connection to close""" + # force the ioloop to stop + self.connection.ioloop.stop() + raise AssertionError('Test timed out') + + +class BoundQueueTestCase(AsyncTestCase): + + def tearDown(self): + """Cleanup auto-declared queue and exchange""" + print "Cleanup time" + self._cconn = self.ADAPTER(PARAMETERS, + self._on_cconn_open, + self._on_cconn_error, + self._on_cconn_closed) + + def start(self): + self.exchange = 'e' + str(id(self)) + self.queue = 'q' + str(id(self)) + self.routing_key = self.__class__.__name__ + super(BoundQueueTestCase, self).start() + + def begin(self, channel): + self.channel.exchange_declare(self.on_exchange_declared, + self.exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=True) + + def on_exchange_declared(self, frame): + self.channel.queue_declare(self.on_queue_declared, + self.queue, + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.channel.queue_bind(self.on_ready, + self.queue, + self.exchange, + self.routing_key) + + def on_ready(self, frame): + raise NotImplementedError + + def _on_cconn_closed(self, cconn): + cconn.ioloop.stop() + self._cconn = None + + def _on_cconn_error(self, connection): + connection.ioloop.stop() + raise AssertionError('Error connecting to RabbitMQ') + + def _on_cconn_open(self, connection): + connection.channel(self._on_cconn_channel) + + def _on_cconn_channel(self, channel): + channel.exchange_delete(None, self.exchange, nowait=True) + channel.queue_delete(None, self.queue, nowait=True) + self._cconn.close() + print "Cleanup Finished" diff --git a/tests/acceptance/asyncore_adapter_tests.py b/tests/acceptance/asyncore_adapter_tests.py new file mode 100644 index 000000000..28fc22c5c --- /dev/null +++ b/tests/acceptance/asyncore_adapter_tests.py @@ -0,0 +1,355 @@ +import time + +import async_test_base + +from pika import adapters +from pika import spec + + +class AsyncTestCase(async_test_base.AsyncTestCase): + ADAPTER = adapters.AsyncoreConnection + + +class BoundQueueTestCase(async_test_base.BoundQueueTestCase): + ADAPTER = adapters.AsyncoreConnection + + +class TestA_Connect(AsyncTestCase): + + ADAPTER = adapters.AsyncoreConnection + + def begin(self, channel): + self.stop() + + def start_test(self): + """AsyncoreConnection should connect, open channel and disconnect""" + self.start() + + +class TestConfirmSelect(AsyncTestCase): + + def begin(self, channel): + channel._on_selectok = self.on_complete + channel.confirm_delivery() + + def on_complete(self, frame): + self.assertIsInstance(frame.method, spec.Confirm.SelectOk) + self.stop() + + def start_test(self): + """AsyncoreConnection should receive confirmation of Confirm.Select""" + self.start() + + +class TestExchangeDeclareAndDelete(AsyncTestCase): + + X_TYPE = 'direct' + + def begin(self, channel): + self.name = self.__class__.__name__ + ':' + str(id(self)) + channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE, + passive=False, + durable=False, + auto_delete=True) + + def on_exchange_declared(self, frame): + self.assertIsInstance(frame.method, spec.Exchange.DeclareOk) + self.channel.exchange_delete(self.on_exchange_delete, self.name) + + def on_exchange_delete(self, frame): + self.assertIsInstance(frame.method, spec.Exchange.DeleteOk) + self.stop() + + def start_test(self): + """TornadoConnection should create and delete an exchange""" + self.start() + + +class TestExchangeRedeclareWithDifferentValues(AsyncTestCase): + + X_TYPE1 = 'direct' + X_TYPE2 = 'topic' + + def begin(self, channel): + self.name = self.__class__.__name__ + ':' + str(id(self)) + self.channel.add_on_close_callback(self.on_channel_closed) + channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE1, + passive=False, + durable=False, + auto_delete=True) + + + def on_cleanup_channel(self, channel): + channel.exchange_delete(None, self.name, nowait=True) + self.stop() + + def on_channel_closed(self, channel, reply_code, reply_text): + self.connection.channel(self.on_cleanup_channel) + + def on_exchange_declared(self, frame): + self.channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE2, + passive=False, + durable=False, + auto_delete=True) + + def on_bad_result(self, frame): + self.channel.exchange_delete(None, self.name, nowait=True) + raise AssertionError("Should not have received a Queue.DeclareOk") + + def start_test(self): + """TornadoConnection should close chan: re-declared exchange w/ diff params + + """ + self.start() + + +class TestQueueDeclareAndDelete(AsyncTestCase): + + def begin(self, channel): + channel.queue_declare(self.on_queue_declared, + passive=False, + durable=False, + exclusive=True, + auto_delete=False, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeclareOk) + self.channel.queue_delete(self.on_queue_delete, frame.method.queue) + + def on_queue_delete(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeleteOk) + self.stop() + + def start_test(self): + """AsyncoreConnection should create and delete a queue""" + self.start() + + +class TestQueueNameDeclareAndDelete(AsyncTestCase): + + def begin(self, channel): + channel.queue_declare(self.on_queue_declared, str(id(self)), + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeclareOk) + self.assertEqual(frame.method.queue, str(id(self))) + self.channel.queue_delete(self.on_queue_delete, frame.method.queue) + + def on_queue_delete(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeleteOk) + self.stop() + + def start_test(self): + """AsyncoreConnection should create and delete a named queue""" + self.start() + + +class TestQueueRedeclareWithDifferentValues(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + channel.queue_declare(self.on_queue_declared, + str(id(self)), + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_queue_declared(self, frame): + self.channel.queue_declare(self.on_bad_result, + str(id(self)), + passive=False, + durable=True, + exclusive=False, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_bad_result(self, frame): + self.channel.queue_delete(None, str(id(self)), nowait=True) + raise AssertionError("Should not have received a Queue.DeclareOk") + + def start_test(self): + """AsyncoreConnection should close chan: re-declared queue w/ diff params + + """ + self.start() + + +class TestTX1_Select(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_complete) + + def on_complete(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.stop() + + def test_confirm_select(self): + """AsyncoreConnection should receive confirmation of Tx.Select""" + self.start() + + +class TestTX2_Commit(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_selectok) + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.channel.tx_commit(self.on_commitok) + + def on_commitok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.CommitOk) + self.stop() + + def start_test(self): + """AsyncoreConnection should start a transaction, then commit it back""" + self.start() + + +class TestTX2_CommitFailure(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + self.channel.tx_commit(self.on_commitok) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + + def on_commitok(self, frame): + raise AssertionError("Should not have received a Tx.CommitOk") + + def start_test(self): + """AsyncoreConnection should close the channel: commit without a TX""" + self.start() + + +class TestTX3_Rollback(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_selectok) + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.channel.tx_rollback(self.on_rollbackok) + + def on_rollbackok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.RollbackOk) + self.stop() + + def start_test(self): + """AsyncoreConnection should start a transaction, then roll it back""" + self.start() + + +class TestTX3_RollbackFailure(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + self.channel.tx_rollback(self.on_commitok) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_commitok(self, frame): + raise AssertionError("Should not have received a Tx.RollbackOk") + + def start_test(self): + """AsyncoreConnection should close the channel: rollback without a TX""" + self.start() + + +class TestZ_PublishAndConsume(BoundQueueTestCase): + + def on_ready(self, frame): + self.ctag = self.channel.basic_consume(self.on_message, self.queue) + self.msg_body = "%s: %i" % (self.__class__.__name__, time.time()) + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + + def on_cancelled(self, frame): + self.assertIsInstance(frame.method, spec.Basic.CancelOk) + self.stop() + + def on_message(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.Deliver) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.channel.basic_cancel(self.on_cancelled, self.ctag) + + def start_test(self): + """AsyncoreConnection should publish a message and consume it""" + self.start() + + +class TestZ_PublishAndConsumeBig(BoundQueueTestCase): + + def _get_msg_body(self): + return '\n'.join(["%s" % i for i in range(0, 2097152)]) + + def on_ready(self, frame): + self.ctag = self.channel.basic_consume(self.on_message, self.queue) + self.msg_body = self._get_msg_body() + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + + def on_cancelled(self, frame): + self.assertIsInstance(frame.method, spec.Basic.CancelOk) + self.stop() + + def on_message(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.Deliver) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.channel.basic_cancel(self.on_cancelled, self.ctag) + + def start_test(self): + """AsyncoreConnection should publish a message and consume it""" + self.start() + + + +class TestZ_PublishAndGet(BoundQueueTestCase): + + def on_ready(self, frame): + self.msg_body = "%s: %i" % (self.__class__.__name__, time.time()) + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + self.channel.basic_get(self.on_get, self.queue) + + def on_get(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.GetOk) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.stop() + + def start_test(self): + """AsyncoreConnection should publish a message and get it""" + self.start() + diff --git a/tests/acceptance/select_adapter_tests.py b/tests/acceptance/select_adapter_tests.py new file mode 100644 index 000000000..cb4efda26 --- /dev/null +++ b/tests/acceptance/select_adapter_tests.py @@ -0,0 +1,355 @@ +import time + +import async_test_base + +from pika import adapters +from pika import spec + + +class AsyncTestCase(async_test_base.AsyncTestCase): + ADAPTER = adapters.SelectConnection + + +class BoundQueueTestCase(async_test_base.BoundQueueTestCase): + ADAPTER = adapters.SelectConnection + + +class TestA_Connect(AsyncTestCase): + + ADAPTER = adapters.SelectConnection + + def begin(self, channel): + self.stop() + + def start_test(self): + """SelectConnection should connect, open channel and disconnect""" + self.start() + + +class TestConfirmSelect(AsyncTestCase): + + def begin(self, channel): + channel._on_selectok = self.on_complete + channel.confirm_delivery() + + def on_complete(self, frame): + self.assertIsInstance(frame.method, spec.Confirm.SelectOk) + self.stop() + + def start_test(self): + """SelectConnection should receive confirmation of Confirm.Select""" + self.start() + + +class TestExchangeDeclareAndDelete(AsyncTestCase): + + X_TYPE = 'direct' + + def begin(self, channel): + self.name = self.__class__.__name__ + ':' + str(id(self)) + channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE, + passive=False, + durable=False, + auto_delete=True) + + def on_exchange_declared(self, frame): + self.assertIsInstance(frame.method, spec.Exchange.DeclareOk) + self.channel.exchange_delete(self.on_exchange_delete, self.name) + + def on_exchange_delete(self, frame): + self.assertIsInstance(frame.method, spec.Exchange.DeleteOk) + self.stop() + + def start_test(self): + """TornadoConnection should create and delete an exchange""" + self.start() + + +class TestExchangeRedeclareWithDifferentValues(AsyncTestCase): + + X_TYPE1 = 'direct' + X_TYPE2 = 'topic' + + def begin(self, channel): + self.name = self.__class__.__name__ + ':' + str(id(self)) + self.channel.add_on_close_callback(self.on_channel_closed) + channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE1, + passive=False, + durable=False, + auto_delete=True) + + + def on_cleanup_channel(self, channel): + channel.exchange_delete(None, self.name, nowait=True) + self.stop() + + def on_channel_closed(self, channel, reply_code, reply_text): + self.connection.channel(self.on_cleanup_channel) + + def on_exchange_declared(self, frame): + self.channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE2, + passive=False, + durable=False, + auto_delete=True) + + def on_bad_result(self, frame): + self.channel.exchange_delete(None, self.name, nowait=True) + raise AssertionError("Should not have received a Queue.DeclareOk") + + def start_test(self): + """TornadoConnection should close chan: re-declared exchange w/ diff params + + """ + self.start() + + +class TestQueueDeclareAndDelete(AsyncTestCase): + + def begin(self, channel): + channel.queue_declare(self.on_queue_declared, + passive=False, + durable=False, + exclusive=True, + auto_delete=False, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeclareOk) + self.channel.queue_delete(self.on_queue_delete, frame.method.queue) + + def on_queue_delete(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeleteOk) + self.stop() + + def start_test(self): + """SelectConnection should create and delete a queue""" + self.start() + + +class TestQueueNameDeclareAndDelete(AsyncTestCase): + + def begin(self, channel): + channel.queue_declare(self.on_queue_declared, str(id(self)), + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeclareOk) + self.assertEqual(frame.method.queue, str(id(self))) + self.channel.queue_delete(self.on_queue_delete, frame.method.queue) + + def on_queue_delete(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeleteOk) + self.stop() + + def start_test(self): + """SelectConnection should create and delete a named queue""" + self.start() + + +class TestQueueRedeclareWithDifferentValues(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + channel.queue_declare(self.on_queue_declared, + str(id(self)), + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_queue_declared(self, frame): + self.channel.queue_declare(self.on_bad_result, + str(id(self)), + passive=False, + durable=True, + exclusive=False, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_bad_result(self, frame): + self.channel.queue_delete(None, str(id(self)), nowait=True) + raise AssertionError("Should not have received a Queue.DeclareOk") + + def start_test(self): + """SelectConnection should close chan: re-declared queue w/ diff params + + """ + self.start() + + +class TestTX1_Select(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_complete) + + def on_complete(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.stop() + + def test_confirm_select(self): + """SelectConnection should receive confirmation of Tx.Select""" + self.start() + + +class TestTX2_Commit(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_selectok) + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.channel.tx_commit(self.on_commitok) + + def on_commitok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.CommitOk) + self.stop() + + def start_test(self): + """SelectConnection should start a transaction, then commit it back""" + self.start() + + +class TestTX2_CommitFailure(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + self.channel.tx_commit(self.on_commitok) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + + def on_commitok(self, frame): + raise AssertionError("Should not have received a Tx.CommitOk") + + def start_test(self): + """SelectConnection should close the channel: commit without a TX""" + self.start() + + +class TestTX3_Rollback(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_selectok) + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.channel.tx_rollback(self.on_rollbackok) + + def on_rollbackok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.RollbackOk) + self.stop() + + def start_test(self): + """SelectConnection should start a transaction, then roll it back""" + self.start() + + +class TestTX3_RollbackFailure(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + self.channel.tx_rollback(self.on_commitok) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_commitok(self, frame): + raise AssertionError("Should not have received a Tx.RollbackOk") + + def start_test(self): + """SelectConnection should close the channel: rollback without a TX""" + self.start() + + +class TestZ_PublishAndConsume(BoundQueueTestCase): + + def on_ready(self, frame): + self.ctag = self.channel.basic_consume(self.on_message, self.queue) + self.msg_body = "%s: %i" % (self.__class__.__name__, time.time()) + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + + def on_cancelled(self, frame): + self.assertIsInstance(frame.method, spec.Basic.CancelOk) + self.stop() + + def on_message(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.Deliver) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.channel.basic_cancel(self.on_cancelled, self.ctag) + + def start_test(self): + """SelectConnection should publish a message and consume it""" + self.start() + + +class TestZ_PublishAndConsumeBig(BoundQueueTestCase): + + def _get_msg_body(self): + return '\n'.join(["%s" % i for i in range(0, 2097152)]) + + def on_ready(self, frame): + self.ctag = self.channel.basic_consume(self.on_message, self.queue) + self.msg_body = self._get_msg_body() + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + + def on_cancelled(self, frame): + self.assertIsInstance(frame.method, spec.Basic.CancelOk) + self.stop() + + def on_message(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.Deliver) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.channel.basic_cancel(self.on_cancelled, self.ctag) + + def start_test(self): + """SelectConnection should publish a message and consume it""" + self.start() + + + +class TestZ_PublishAndGet(BoundQueueTestCase): + + def on_ready(self, frame): + self.msg_body = "%s: %i" % (self.__class__.__name__, time.time()) + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + self.channel.basic_get(self.on_get, self.queue) + + def on_get(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.GetOk) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.stop() + + def start_test(self): + """SelectConnection should publish a message and get it""" + self.start() + diff --git a/tests/acceptance/tornado_adapter_tests.py b/tests/acceptance/tornado_adapter_tests.py new file mode 100644 index 000000000..1447deae2 --- /dev/null +++ b/tests/acceptance/tornado_adapter_tests.py @@ -0,0 +1,345 @@ +import time + +import async_test_base + +from pika import adapters +from pika import spec + + +class AsyncTestCase(async_test_base.AsyncTestCase): + ADAPTER = adapters.TornadoConnection + + +class BoundQueueTestCase(async_test_base.BoundQueueTestCase): + ADAPTER = adapters.TornadoConnection + + + + +class TestConfirmSelect(AsyncTestCase): + + def begin(self, channel): + channel._on_selectok = self.on_complete + channel.confirm_delivery() + + def on_complete(self, frame): + self.assertIsInstance(frame.method, spec.Confirm.SelectOk) + self.stop() + + def start_test(self): + """TornadoConnection should receive confirmation of Confirm.Select""" + self.start() + + +class TestExchangeDeclareAndDelete(AsyncTestCase): + + X_TYPE = 'direct' + + def begin(self, channel): + self.name = self.__class__.__name__ + ':' + str(id(self)) + channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE, + passive=False, + durable=False, + auto_delete=True) + + def on_exchange_declared(self, frame): + self.assertIsInstance(frame.method, spec.Exchange.DeclareOk) + self.channel.exchange_delete(self.on_exchange_delete, self.name) + + def on_exchange_delete(self, frame): + self.assertIsInstance(frame.method, spec.Exchange.DeleteOk) + self.stop() + + def start_test(self): + """TornadoConnection should create and delete an exchange""" + self.start() + + +class TestExchangeRedeclareWithDifferentValues(AsyncTestCase): + + X_TYPE1 = 'direct' + X_TYPE2 = 'topic' + + def begin(self, channel): + self.name = self.__class__.__name__ + ':' + str(id(self)) + self.channel.add_on_close_callback(self.on_channel_closed) + channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE1, + passive=False, + durable=False, + auto_delete=True) + + + def on_cleanup_channel(self, channel): + channel.exchange_delete(None, self.name, nowait=True) + self.stop() + + def on_channel_closed(self, channel, reply_code, reply_text): + self.connection.channel(self.on_cleanup_channel) + + def on_exchange_declared(self, frame): + self.channel.exchange_declare(self.on_exchange_declared, + self.name, + exchange_type=self.X_TYPE2, + passive=False, + durable=False, + auto_delete=True) + + def on_bad_result(self, frame): + self.channel.exchange_delete(None, self.name, nowait=True) + raise AssertionError("Should not have received a Queue.DeclareOk") + + def start_test(self): + """TornadoConnection should close chan: re-declared exchange w/ diff params + + """ + self.start() + + +class TestQueueDeclareAndDelete(AsyncTestCase): + + def begin(self, channel): + channel.queue_declare(self.on_queue_declared, + passive=False, + durable=False, + exclusive=True, + auto_delete=False, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeclareOk) + self.channel.queue_delete(self.on_queue_delete, frame.method.queue) + + def on_queue_delete(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeleteOk) + self.stop() + + def start_test(self): + """TornadoConnection should create and delete a queue""" + self.start() + + +class TestQueueNameDeclareAndDelete(AsyncTestCase): + + def begin(self, channel): + channel.queue_declare(self.on_queue_declared, str(id(self)), + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_queue_declared(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeclareOk) + self.assertEqual(frame.method.queue, str(id(self))) + self.channel.queue_delete(self.on_queue_delete, frame.method.queue) + + def on_queue_delete(self, frame): + self.assertIsInstance(frame.method, spec.Queue.DeleteOk) + self.stop() + + def start_test(self): + """TornadoConnection should create and delete a named queue""" + self.start() + + +class TestQueueRedeclareWithDifferentValues(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + channel.queue_declare(self.on_queue_declared, + str(id(self)), + passive=False, + durable=False, + exclusive=True, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_queue_declared(self, frame): + self.channel.queue_declare(self.on_bad_result, + str(id(self)), + passive=False, + durable=True, + exclusive=False, + auto_delete=True, + nowait=False, + arguments={'x-expires': self.TIMEOUT}) + + def on_bad_result(self, frame): + self.channel.queue_delete(None, str(id(self)), nowait=True) + raise AssertionError("Should not have received a Queue.DeclareOk") + + def start_test(self): + """TornadoConnection should close chan: re-declared queue w/ diff params + + """ + self.start() + + +class TestTX1_Select(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_complete) + + def on_complete(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.stop() + + def test_confirm_select(self): + """TornadoConnection should receive confirmation of Tx.Select""" + self.start() + + +class TestTX2_Commit(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_selectok) + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.channel.tx_commit(self.on_commitok) + + def on_commitok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.CommitOk) + self.stop() + + def start_test(self): + """TornadoConnection should start a transaction, then commit it back""" + self.start() + + +class TestTX2_CommitFailure(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + self.channel.tx_commit(self.on_commitok) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + + def on_commitok(self, frame): + raise AssertionError("Should not have received a Tx.CommitOk") + + def start_test(self): + """TornadoConnection should close the channel: commit without a TX""" + self.start() + + +class TestTX3_Rollback(AsyncTestCase): + + def begin(self, channel): + channel.tx_select(self.on_selectok) + + def on_selectok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.SelectOk) + self.channel.tx_rollback(self.on_rollbackok) + + def on_rollbackok(self, frame): + self.assertIsInstance(frame.method, spec.Tx.RollbackOk) + self.stop() + + def start_test(self): + """TornadoConnection should start a transaction, then roll it back""" + self.start() + + +class TestTX3_RollbackFailure(AsyncTestCase): + + def begin(self, channel): + self.channel.add_on_close_callback(self.on_channel_closed) + self.channel.tx_rollback(self.on_commitok) + + def on_channel_closed(self, channel, reply_code, reply_text): + self.stop() + + def on_commitok(self, frame): + raise AssertionError("Should not have received a Tx.RollbackOk") + + def start_test(self): + """TornadoConnection should close the channel: rollback without a TX""" + self.start() + + +class TestZ_PublishAndConsume(BoundQueueTestCase): + + def on_ready(self, frame): + self.ctag = self.channel.basic_consume(self.on_message, self.queue) + self.msg_body = "%s: %i" % (self.__class__.__name__, time.time()) + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + + def on_cancelled(self, frame): + self.assertIsInstance(frame.method, spec.Basic.CancelOk) + self.stop() + + def on_message(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.Deliver) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.channel.basic_cancel(self.on_cancelled, self.ctag) + + def start_test(self): + """TornadoConnection should publish a message and consume it""" + self.start() + + +class TestZ_PublishAndConsumeBig(BoundQueueTestCase): + + def _get_msg_body(self): + return '\n'.join(["%s" % i for i in range(0, 2097152)]) + + def on_ready(self, frame): + self.ctag = self.channel.basic_consume(self.on_message, self.queue) + self.msg_body = self._get_msg_body() + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + + def on_cancelled(self, frame): + self.assertIsInstance(frame.method, spec.Basic.CancelOk) + self.stop() + + def on_message(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.Deliver) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.channel.basic_cancel(self.on_cancelled, self.ctag) + + def start_test(self): + """TornadoConnection should publish a message and consume it""" + self.start() + + + +class TestZ_PublishAndGet(BoundQueueTestCase): + + def on_ready(self, frame): + self.msg_body = "%s: %i" % (self.__class__.__name__, time.time()) + self.channel.basic_publish(self.exchange, + self.routing_key, + self.msg_body) + self.channel.basic_get(self.on_get, self.queue) + + def on_get(self, channel, method, header, body): + self.assertIsInstance(method, spec.Basic.GetOk) + self.assertEqual(body, self.msg_body) + self.channel.basic_ack(method.delivery_tag) + self.stop() + + def start_test(self): + """TornadoConnection should publish a message and get it""" + self.start() + diff --git a/tests/integration/README.md b/tests/integration/README.md deleted file mode 100644 index 7b881beb5..000000000 --- a/tests/integration/README.md +++ /dev/null @@ -1,27 +0,0 @@ -## Pika Integration Tests - -Install the necessary packages with `pip install -r tests/integration/requirements.txt`. -Next copy `tests/integration/broker.conf.in` to `tests/integration/broker.conf` and -tweak settings to suit. The tests can be run with `make integration` or with -`nosetests -w tests/integration`. - -### Dependencies -* nose -* PyYAML - -### Default config settings - -#### username -admin - -#### password -secret - -#### host -localhost - -#### port -5672 - -#### virtual_host -testing diff --git a/tests/integration/broker.conf b/tests/integration/broker.conf deleted file mode 100644 index fcb05d5ee..000000000 --- a/tests/integration/broker.conf +++ /dev/null @@ -1,5 +0,0 @@ -username: guest -password: guest -host: localhost -port: 5672 -virtual_host: / diff --git a/tests/integration/connection_tests.py b/tests/integration/connection_tests.py deleted file mode 100644 index 9a8b468be..000000000 --- a/tests/integration/connection_tests.py +++ /dev/null @@ -1,19 +0,0 @@ -import test_base - - -class TestSelectConnection(test_base.SelectConnectionTestCase): - - def test_close_with_channels(self): - """Regression test ensuring connection.close doesn't raise an - AttributeError (see #279) - - """ - def on_channel_open(channel): - # no real good way to handle the exception that's raised, so this - # test will either pass or blow up - self.stop() - - def on_connected(conn): - conn.channel(on_channel_open) - - self.start(on_connected) diff --git a/tests/integration/requirements.txt b/tests/integration/requirements.txt deleted file mode 100644 index 9c7c63794..000000000 --- a/tests/integration/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -nose -PyYAML diff --git a/tests/integration/test_base.py b/tests/integration/test_base.py deleted file mode 100644 index 5e2b4a8ec..000000000 --- a/tests/integration/test_base.py +++ /dev/null @@ -1,104 +0,0 @@ -import os -import sys -try: - import unittest2 as unittest -except ImportError: - import unittest -import logging - -import pika -import yaml - -CONFIG = yaml.load(open(os.path.join(os.path.abspath(os.path.dirname(__file__)), - 'broker.conf'))) - -LOGGING_LEVEL = logging.CRITICAL -TIMEOUT = 4 - - -class TestCase(unittest.TestCase): - - def __call__(self, result=None): - self._result = result - test_method = getattr(self, self._testMethodName) - skipped = (getattr(self.__class__, '__unittest_skip__', False) or - getattr(test_method, '__unittest_skip__', False)) - if not skipped: - try: - self._pre_setup() - except (KeyboardInterrupt, SystemExit): - raise - except Exception: - result.addError(self, sys.exc_info()) - return - super(TestCase, self).__call__(result) - - if not skipped: - try: - self._post_teardown() - except (KeyboardInterrupt, SystemExit): - raise - except Exception: - result.addError(self, sys.exc_info()) - return - - def _get_parameters(self): - self.credentials = pika.PlainCredentials(CONFIG['username'], - CONFIG['password']) - return pika.ConnectionParameters(host=CONFIG['host'], - port=CONFIG['port'], - virtual_host=CONFIG['virtual_host'], - credentials=self.credentials) - - def _pre_setup(self): - self._timed_out = False - logging.getLogger('pika').setLevel(LOGGING_LEVEL) - self.parameters = self._get_parameters() - self._timeout_id = None - self.connection = None - self.config = CONFIG - - def _post_teardown(self): - del self.credentials - del self.parameters - del self.connection - del self.config - del self._timeout_id - del self._timed_out - - def start(self): - pass - - def stop(self): - """close the connection and stop the ioloop""" - self.connection.remove_timeout(self._timeout_id) - self.connection.add_timeout(4, self._on_close_timeout) - self.connection.add_on_close_callback(self._on_closed) - self.connection.close() - - def _on_closed(self, connection, reply_code, reply_text): - """called when the connection has finished closing""" - self.connection.ioloop.stop() - if self._timed_out: - raise AssertionError('Timed out. Did you call `stop`?') - - def _on_close_timeout(self): - """called when stuck waiting for connection to close""" - # force the ioloop to stop - self.connection.ioloop.stop() - raise AssertionError('Timed out waiting for connection to close') - - def _on_timeout(self): - self._timed_out = True - self.stop() - - -class SelectConnectionTestCase(TestCase): - - def start(self, on_connected): - """Connect to rabbitmq and start the ioloop - - """ - self.connection = pika.SelectConnection(self.parameters, - on_connected) - self.connection.ioloop.start() diff --git a/tests/amqp_object_tests.py b/tests/unit/amqp_object_tests.py similarity index 100% rename from tests/amqp_object_tests.py rename to tests/unit/amqp_object_tests.py diff --git a/tests/base_connection_tests.py b/tests/unit/base_connection_tests.py similarity index 100% rename from tests/base_connection_tests.py rename to tests/unit/base_connection_tests.py diff --git a/tests/blocking_channel_tests.py b/tests/unit/blocking_channel_tests.py similarity index 100% rename from tests/blocking_channel_tests.py rename to tests/unit/blocking_channel_tests.py diff --git a/tests/callback_tests.py b/tests/unit/callback_tests.py similarity index 100% rename from tests/callback_tests.py rename to tests/unit/callback_tests.py diff --git a/tests/channel_tests.py b/tests/unit/channel_tests.py similarity index 100% rename from tests/channel_tests.py rename to tests/unit/channel_tests.py diff --git a/tests/connection_tests.py b/tests/unit/connection_tests.py similarity index 100% rename from tests/connection_tests.py rename to tests/unit/connection_tests.py diff --git a/tests/connection_timeout_tests.py b/tests/unit/connection_timeout_tests.py similarity index 100% rename from tests/connection_timeout_tests.py rename to tests/unit/connection_timeout_tests.py diff --git a/tests/content_frame_dispatcher_tests.py b/tests/unit/content_frame_dispatcher_tests.py similarity index 100% rename from tests/content_frame_dispatcher_tests.py rename to tests/unit/content_frame_dispatcher_tests.py diff --git a/tests/credentials_tests.py b/tests/unit/credentials_tests.py similarity index 100% rename from tests/credentials_tests.py rename to tests/unit/credentials_tests.py diff --git a/tests/data_tests.py b/tests/unit/data_tests.py similarity index 100% rename from tests/data_tests.py rename to tests/unit/data_tests.py diff --git a/tests/exceptions_test.py b/tests/unit/exceptions_test.py similarity index 100% rename from tests/exceptions_test.py rename to tests/unit/exceptions_test.py diff --git a/tests/frame_tests.py b/tests/unit/frame_tests.py similarity index 100% rename from tests/frame_tests.py rename to tests/unit/frame_tests.py diff --git a/tests/heartbeat_tests.py b/tests/unit/heartbeat_tests.py similarity index 100% rename from tests/heartbeat_tests.py rename to tests/unit/heartbeat_tests.py diff --git a/tests/parameter_tests.py b/tests/unit/parameter_tests.py similarity index 100% rename from tests/parameter_tests.py rename to tests/unit/parameter_tests.py diff --git a/tests/tornado_tests.py b/tests/unit/tornado_tests.py similarity index 100% rename from tests/tornado_tests.py rename to tests/unit/tornado_tests.py diff --git a/tests/utils_tests.py b/tests/unit/utils_tests.py similarity index 100% rename from tests/utils_tests.py rename to tests/unit/utils_tests.py