Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Skip libev tests if pyev is not installed or if they are being run in…

… pypy

Minor change to how tests are structured and skipped
  • Loading branch information...
commit 7918e1ab981a1619af6792cac116880133beb186 1 parent bb583bf
@gmr gmr authored
Showing with 282 additions and 247 deletions.
  1. +282 −247 tests/acceptance/libev_adapter_tests.py
View
529 tests/acceptance/libev_adapter_tests.py
@@ -1,347 +1,382 @@
-import sys
-if sys.version_info[:2] == (2,7) and not getattr(sys, 'pypy_version_info', None):
-
- import time
- import async_test_base
+import platform
+import time
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
- from pika import adapters
- from pika import spec
+import async_test_base
- class AsyncTestCase(async_test_base.AsyncTestCase):
- ADAPTER = adapters.LibevConnection
+from pika import adapters
+from pika import spec
+target = platform.python_implementation()
- class BoundQueueTestCase(async_test_base.BoundQueueTestCase):
- ADAPTER = adapters.LibevConnection
+class AsyncTestCase(async_test_base.AsyncTestCase):
+ ADAPTER = adapters.LibevConnection
+class BoundQueueTestCase(async_test_base.BoundQueueTestCase):
+ ADAPTER = adapters.LibevConnection
- class TestConfirmSelect(AsyncTestCase):
- def begin(self, channel):
- channel._on_selectok = self.on_complete
- channel.confirm_delivery()
- def on_complete(self, frame):
- self.assertIsInstance(frame.method, spec.Confirm.SelectOk)
- self.stop()
- def start_test(self):
- """LibevConnection should receive confirmation of Confirm.Select"""
- self.start()
+class TestConfirmSelect(AsyncTestCase):
+ def begin(self, channel):
+ channel._on_selectok = self.on_complete
+ channel.confirm_delivery()
- class TestExchangeDeclareAndDelete(AsyncTestCase):
+ def on_complete(self, frame):
+ self.assertIsInstance(frame.method, spec.Confirm.SelectOk)
+ self.stop()
- X_TYPE = 'direct'
- def begin(self, channel):
- self.name = self.__class__.__name__ + ':' + str(id(self))
- channel.exchange_declare(self.on_exchange_declared,
- self.name,
- exchange_type=self.X_TYPE,
- passive=False,
- durable=False,
- auto_delete=True)
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should receive confirmation of Confirm.Select"""
+ self.start()
- def on_exchange_declared(self, frame):
- self.assertIsInstance(frame.method, spec.Exchange.DeclareOk)
- self.channel.exchange_delete(self.on_exchange_delete, self.name)
- def on_exchange_delete(self, frame):
- self.assertIsInstance(frame.method, spec.Exchange.DeleteOk)
- self.stop()
+class TestExchangeDeclareAndDelete(AsyncTestCase):
- def start_test(self):
- """LibevConnection should create and delete an exchange"""
- self.start()
+ X_TYPE = 'direct'
+ def begin(self, channel):
+ self.name = self.__class__.__name__ + ':' + str(id(self))
+ channel.exchange_declare(self.on_exchange_declared,
+ self.name,
+ exchange_type=self.X_TYPE,
+ passive=False,
+ durable=False,
+ auto_delete=True)
- class TestExchangeRedeclareWithDifferentValues(AsyncTestCase):
+ def on_exchange_declared(self, frame):
+ self.assertIsInstance(frame.method, spec.Exchange.DeclareOk)
+ self.channel.exchange_delete(self.on_exchange_delete, self.name)
- X_TYPE1 = 'direct'
- X_TYPE2 = 'topic'
+ def on_exchange_delete(self, frame):
+ self.assertIsInstance(frame.method, spec.Exchange.DeleteOk)
+ self.stop()
- def begin(self, channel):
- self.name = self.__class__.__name__ + ':' + str(id(self))
- self.channel.add_on_close_callback(self.on_channel_closed)
- channel.exchange_declare(self.on_exchange_declared,
- self.name,
- exchange_type=self.X_TYPE1,
- passive=False,
- durable=False,
- auto_delete=True)
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should create and delete an exchange"""
+ self.start()
- def on_cleanup_channel(self, channel):
- channel.exchange_delete(None, self.name, nowait=True)
- self.stop()
+class TestExchangeRedeclareWithDifferentValues(AsyncTestCase):
- def on_channel_closed(self, channel, reply_code, reply_text):
- self.connection.channel(self.on_cleanup_channel)
+ X_TYPE1 = 'direct'
+ X_TYPE2 = 'topic'
- def on_exchange_declared(self, frame):
- self.channel.exchange_declare(self.on_exchange_declared,
- self.name,
- exchange_type=self.X_TYPE2,
- passive=False,
- durable=False,
- auto_delete=True)
+ def begin(self, channel):
+ self.name = self.__class__.__name__ + ':' + str(id(self))
+ self.channel.add_on_close_callback(self.on_channel_closed)
+ channel.exchange_declare(self.on_exchange_declared,
+ self.name,
+ exchange_type=self.X_TYPE1,
+ passive=False,
+ durable=False,
+ auto_delete=True)
- def on_bad_result(self, frame):
- self.channel.exchange_delete(None, self.name, nowait=True)
- raise AssertionError("Should not have received a Queue.DeclareOk")
- def start_test(self):
- """LibevConnection should close chan: re-declared exchange w/ diff params
+ def on_cleanup_channel(self, channel):
+ channel.exchange_delete(None, self.name, nowait=True)
+ self.stop()
- """
- self.start()
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ self.connection.channel(self.on_cleanup_channel)
+ def on_exchange_declared(self, frame):
+ self.channel.exchange_declare(self.on_exchange_declared,
+ self.name,
+ exchange_type=self.X_TYPE2,
+ passive=False,
+ durable=False,
+ auto_delete=True)
- class TestQueueDeclareAndDelete(AsyncTestCase):
+ def on_bad_result(self, frame):
+ self.channel.exchange_delete(None, self.name, nowait=True)
+ raise AssertionError("Should not have received a Queue.DeclareOk")
- def begin(self, channel):
- channel.queue_declare(self.on_queue_declared,
- passive=False,
- durable=False,
- exclusive=True,
- auto_delete=False,
- nowait=False,
- arguments={'x-expires': self.TIMEOUT})
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should close chan: re-declared exchange w/ diff params
- def on_queue_declared(self, frame):
- self.assertIsInstance(frame.method, spec.Queue.DeclareOk)
- self.channel.queue_delete(self.on_queue_delete, frame.method.queue)
+ """
+ self.start()
- def on_queue_delete(self, frame):
- self.assertIsInstance(frame.method, spec.Queue.DeleteOk)
- self.stop()
- def start_test(self):
- """LibevConnection should create and delete a queue"""
- self.start()
+class TestQueueDeclareAndDelete(AsyncTestCase):
+ def begin(self, channel):
+ channel.queue_declare(self.on_queue_declared,
+ passive=False,
+ durable=False,
+ exclusive=True,
+ auto_delete=False,
+ nowait=False,
+ arguments={'x-expires': self.TIMEOUT})
- class TestQueueNameDeclareAndDelete(AsyncTestCase):
+ def on_queue_declared(self, frame):
+ self.assertIsInstance(frame.method, spec.Queue.DeclareOk)
+ self.channel.queue_delete(self.on_queue_delete, frame.method.queue)
- def begin(self, channel):
- channel.queue_declare(self.on_queue_declared, str(id(self)),
- passive=False,
- durable=False,
- exclusive=True,
- auto_delete=True,
- nowait=False,
- arguments={'x-expires': self.TIMEOUT})
+ def on_queue_delete(self, frame):
+ self.assertIsInstance(frame.method, spec.Queue.DeleteOk)
+ self.stop()
- def on_queue_declared(self, frame):
- self.assertIsInstance(frame.method, spec.Queue.DeclareOk)
- self.assertEqual(frame.method.queue, str(id(self)))
- self.channel.queue_delete(self.on_queue_delete, frame.method.queue)
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should create and delete a queue"""
+ self.start()
+
+
+class TestQueueNameDeclareAndDelete(AsyncTestCase):
- def on_queue_delete(self, frame):
- self.assertIsInstance(frame.method, spec.Queue.DeleteOk)
- self.stop()
+ def begin(self, channel):
+ channel.queue_declare(self.on_queue_declared, str(id(self)),
+ passive=False,
+ durable=False,
+ exclusive=True,
+ auto_delete=True,
+ nowait=False,
+ arguments={'x-expires': self.TIMEOUT})
- def start_test(self):
- """LibevConnection should create and delete a named queue"""
- self.start()
+ def on_queue_declared(self, frame):
+ self.assertIsInstance(frame.method, spec.Queue.DeclareOk)
+ self.assertEqual(frame.method.queue, str(id(self)))
+ self.channel.queue_delete(self.on_queue_delete, frame.method.queue)
+ def on_queue_delete(self, frame):
+ self.assertIsInstance(frame.method, spec.Queue.DeleteOk)
+ self.stop()
- class TestQueueRedeclareWithDifferentValues(AsyncTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should create and delete a named queue"""
+ self.start()
- def begin(self, channel):
- self.channel.add_on_close_callback(self.on_channel_closed)
- channel.queue_declare(self.on_queue_declared,
- str(id(self)),
- passive=False,
- durable=False,
- exclusive=True,
- auto_delete=True,
- nowait=False,
- arguments={'x-expires': self.TIMEOUT})
- def on_channel_closed(self, channel, reply_code, reply_text):
- self.stop()
+class TestQueueRedeclareWithDifferentValues(AsyncTestCase):
- def on_queue_declared(self, frame):
- self.channel.queue_declare(self.on_bad_result,
- str(id(self)),
- passive=False,
- durable=True,
- exclusive=False,
- auto_delete=True,
- nowait=False,
- arguments={'x-expires': self.TIMEOUT})
+ def begin(self, channel):
+ self.channel.add_on_close_callback(self.on_channel_closed)
+ channel.queue_declare(self.on_queue_declared,
+ str(id(self)),
+ passive=False,
+ durable=False,
+ exclusive=True,
+ auto_delete=True,
+ nowait=False,
+ arguments={'x-expires': self.TIMEOUT})
- def on_bad_result(self, frame):
- self.channel.queue_delete(None, str(id(self)), nowait=True)
- raise AssertionError("Should not have received a Queue.DeclareOk")
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ self.stop()
- def start_test(self):
- """LibevConnection should close chan: re-declared queue w/ diff params
+ def on_queue_declared(self, frame):
+ self.channel.queue_declare(self.on_bad_result,
+ str(id(self)),
+ passive=False,
+ durable=True,
+ exclusive=False,
+ auto_delete=True,
+ nowait=False,
+ arguments={'x-expires': self.TIMEOUT})
- """
- self.start()
+ def on_bad_result(self, frame):
+ self.channel.queue_delete(None, str(id(self)), nowait=True)
+ raise AssertionError("Should not have received a Queue.DeclareOk")
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should close chan: re-declared queue w/ diff params
- class TestTX1_Select(AsyncTestCase):
+ """
+ self.start()
- def begin(self, channel):
- channel.tx_select(self.on_complete)
- def on_complete(self, frame):
- self.assertIsInstance(frame.method, spec.Tx.SelectOk)
- self.stop()
+class TestTX1_Select(AsyncTestCase):
- def test_confirm_select(self):
- """LibevConnection should receive confirmation of Tx.Select"""
- self.start()
+ def begin(self, channel):
+ channel.tx_select(self.on_complete)
+ def on_complete(self, frame):
+ self.assertIsInstance(frame.method, spec.Tx.SelectOk)
+ self.stop()
- class TestTX2_Commit(AsyncTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def test_confirm_select(self):
+ """LibevConnection should receive confirmation of Tx.Select"""
+ self.start()
- def begin(self, channel):
- channel.tx_select(self.on_selectok)
- def on_selectok(self, frame):
- self.assertIsInstance(frame.method, spec.Tx.SelectOk)
- self.channel.tx_commit(self.on_commitok)
+class TestTX2_Commit(AsyncTestCase):
- def on_commitok(self, frame):
- self.assertIsInstance(frame.method, spec.Tx.CommitOk)
- self.stop()
+ def begin(self, channel):
+ channel.tx_select(self.on_selectok)
- def start_test(self):
- """LibevConnection should start a transaction, then commit it back"""
- self.start()
+ def on_selectok(self, frame):
+ self.assertIsInstance(frame.method, spec.Tx.SelectOk)
+ self.channel.tx_commit(self.on_commitok)
+ def on_commitok(self, frame):
+ self.assertIsInstance(frame.method, spec.Tx.CommitOk)
+ self.stop()
- class TestTX2_CommitFailure(AsyncTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should start a transaction, then commit it back"""
+ self.start()
- def begin(self, channel):
- self.channel.add_on_close_callback(self.on_channel_closed)
- self.channel.tx_commit(self.on_commitok)
- def on_channel_closed(self, channel, reply_code, reply_text):
- self.stop()
+class TestTX2_CommitFailure(AsyncTestCase):
- def on_selectok(self, frame):
- self.assertIsInstance(frame.method, spec.Tx.SelectOk)
+ def begin(self, channel):
+ self.channel.add_on_close_callback(self.on_channel_closed)
+ self.channel.tx_commit(self.on_commitok)
- def on_commitok(self, frame):
- raise AssertionError("Should not have received a Tx.CommitOk")
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ self.stop()
- def start_test(self):
- """LibevConnection should close the channel: commit without a TX"""
- self.start()
+ def on_selectok(self, frame):
+ self.assertIsInstance(frame.method, spec.Tx.SelectOk)
+ def on_commitok(self, frame):
+ raise AssertionError("Should not have received a Tx.CommitOk")
- class TestTX3_Rollback(AsyncTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should close the channel: commit without a TX"""
+ self.start()
- def begin(self, channel):
- channel.tx_select(self.on_selectok)
- def on_selectok(self, frame):
- self.assertIsInstance(frame.method, spec.Tx.SelectOk)
- self.channel.tx_rollback(self.on_rollbackok)
+class TestTX3_Rollback(AsyncTestCase):
- def on_rollbackok(self, frame):
- self.assertIsInstance(frame.method, spec.Tx.RollbackOk)
- self.stop()
+ def begin(self, channel):
+ channel.tx_select(self.on_selectok)
- def start_test(self):
- """LibevConnection should start a transaction, then roll it back"""
- self.start()
+ def on_selectok(self, frame):
+ self.assertIsInstance(frame.method, spec.Tx.SelectOk)
+ self.channel.tx_rollback(self.on_rollbackok)
+ def on_rollbackok(self, frame):
+ self.assertIsInstance(frame.method, spec.Tx.RollbackOk)
+ self.stop()
- class TestTX3_RollbackFailure(AsyncTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should start a transaction, then roll it back"""
+ self.start()
- def begin(self, channel):
- self.channel.add_on_close_callback(self.on_channel_closed)
- self.channel.tx_rollback(self.on_commitok)
- def on_channel_closed(self, channel, reply_code, reply_text):
- self.stop()
+class TestTX3_RollbackFailure(AsyncTestCase):
- def on_commitok(self, frame):
- raise AssertionError("Should not have received a Tx.RollbackOk")
+ def begin(self, channel):
+ self.channel.add_on_close_callback(self.on_channel_closed)
+ self.channel.tx_rollback(self.on_commitok)
- def start_test(self):
- """LibevConnection should close the channel: rollback without a TX"""
- self.start()
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ self.stop()
+ def on_commitok(self, frame):
+ raise AssertionError("Should not have received a Tx.RollbackOk")
- class TestZ_PublishAndConsume(BoundQueueTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should close the channel: rollback without a TX"""
+ self.start()
- def on_ready(self, frame):
- self.ctag = self.channel.basic_consume(self.on_message, self.queue)
- self.msg_body = "%s: %i" % (self.__class__.__name__, time.time())
- self.channel.basic_publish(self.exchange,
- self.routing_key,
- self.msg_body)
- def on_cancelled(self, frame):
- self.assertIsInstance(frame.method, spec.Basic.CancelOk)
- self.stop()
+class TestZ_PublishAndConsume(BoundQueueTestCase):
- def on_message(self, channel, method, header, body):
- self.assertIsInstance(method, spec.Basic.Deliver)
- self.assertEqual(body, self.msg_body)
- self.channel.basic_ack(method.delivery_tag)
- self.channel.basic_cancel(self.on_cancelled, self.ctag)
+ def on_ready(self, frame):
+ self.ctag = self.channel.basic_consume(self.on_message, self.queue)
+ self.msg_body = "%s: %i" % (self.__class__.__name__, time.time())
+ self.channel.basic_publish(self.exchange,
+ self.routing_key,
+ self.msg_body)
- def start_test(self):
- """LibevConnection should publish a message and consume it"""
- self.start()
+ def on_cancelled(self, frame):
+ self.assertIsInstance(frame.method, spec.Basic.CancelOk)
+ self.stop()
+ def on_message(self, channel, method, header, body):
+ self.assertIsInstance(method, spec.Basic.Deliver)
+ self.assertEqual(body, self.msg_body)
+ self.channel.basic_ack(method.delivery_tag)
+ self.channel.basic_cancel(self.on_cancelled, self.ctag)
- class TestZ_PublishAndConsumeBig(BoundQueueTestCase):
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should publish a message and consume it"""
+ self.start()
- def _get_msg_body(self):
- return '\n'.join(["%s" % i for i in range(0, 2097152)])
- def on_ready(self, frame):
- self.ctag = self.channel.basic_consume(self.on_message, self.queue)
- self.msg_body = self._get_msg_body()
- self.channel.basic_publish(self.exchange,
- self.routing_key,
- self.msg_body)
+class TestZ_PublishAndConsumeBig(BoundQueueTestCase):
- def on_cancelled(self, frame):
- self.assertIsInstance(frame.method, spec.Basic.CancelOk)
- self.stop()
+ def _get_msg_body(self):
+ return '\n'.join(["%s" % i for i in range(0, 2097152)])
- def on_message(self, channel, method, header, body):
- self.assertIsInstance(method, spec.Basic.Deliver)
- self.assertEqual(body, self.msg_body)
- self.channel.basic_ack(method.delivery_tag)
- self.channel.basic_cancel(self.on_cancelled, self.ctag)
+ def on_ready(self, frame):
+ self.ctag = self.channel.basic_consume(self.on_message, self.queue)
+ self.msg_body = self._get_msg_body()
+ self.channel.basic_publish(self.exchange,
+ self.routing_key,
+ self.msg_body)
- def start_test(self):
- """LibevConnection should publish a big message and consume it"""
- self.start()
+ def on_cancelled(self, frame):
+ self.assertIsInstance(frame.method, spec.Basic.CancelOk)
+ self.stop()
+ def on_message(self, channel, method, header, body):
+ self.assertIsInstance(method, spec.Basic.Deliver)
+ self.assertEqual(body, self.msg_body)
+ self.channel.basic_ack(method.delivery_tag)
+ self.channel.basic_cancel(self.on_cancelled, self.ctag)
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should publish a big message and consume it"""
+ self.start()
- class TestZ_PublishAndGet(BoundQueueTestCase):
- def on_ready(self, frame):
- self.msg_body = "%s: %i" % (self.__class__.__name__, time.time())
- self.channel.basic_publish(self.exchange,
- self.routing_key,
- self.msg_body)
- self.channel.basic_get(self.on_get, self.queue)
- def on_get(self, channel, method, header, body):
- self.assertIsInstance(method, spec.Basic.GetOk)
- self.assertEqual(body, self.msg_body)
- self.channel.basic_ack(method.delivery_tag)
- self.stop()
+class TestZ_PublishAndGet(BoundQueueTestCase):
- def start_test(self):
- """LibevConnection should publish a message and get it"""
- self.start()
+ def on_ready(self, frame):
+ self.msg_body = "%s: %i" % (self.__class__.__name__, time.time())
+ self.channel.basic_publish(self.exchange,
+ self.routing_key,
+ self.msg_body)
+ self.channel.basic_get(self.on_get, self.queue)
+
+ def on_get(self, channel, method, header, body):
+ self.assertIsInstance(method, spec.Basic.GetOk)
+ self.assertEqual(body, self.msg_body)
+ self.channel.basic_ack(method.delivery_tag)
+ self.stop()
+
+ @unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
+ @unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
+ def start_test(self):
+ """LibevConnection should publish a message and get it"""
+ self.start()
Please sign in to comment.
Something went wrong with that request. Please try again.