Skip to content

Commit

Permalink
Merge pull request #563 from wjps/adapter-test-rationalisation
Browse files Browse the repository at this point in the history
Rationalise adapter acceptance tests
  • Loading branch information
gmr committed May 18, 2015
2 parents bcb60c4 + 7e989f9 commit 5e19543
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 1,139 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
import time
import uuid

import async_test_base

from pika import adapters
from pika import spec

from async_test_base import (AsyncTestCase, BoundQueueTestCase, AsyncAdapters)

class AsyncTestCase(async_test_base.AsyncTestCase):
ADAPTER = adapters.SelectConnection


class BoundQueueTestCase(async_test_base.BoundQueueTestCase):
ADAPTER = adapters.SelectConnection


class TestA_Connect(AsyncTestCase):

ADAPTER = adapters.SelectConnection

class TestA_Connect(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Connect, open channel and disconnect"

def begin(self, channel):
self.stop()

def start_test(self):
"""SelectConnection should connect, open channel and disconnect"""
self.start()


class TestConfirmSelect(AsyncTestCase):
class TestConfirmSelect(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Receive confirmation of Confirm.Select"

def begin(self, channel):
channel._on_selectok = self.on_complete
Expand All @@ -37,12 +23,9 @@ def on_complete(self, frame):
self.assertIsInstance(frame.method, spec.Confirm.SelectOk)
self.stop()

def start_test(self):
"""SelectConnection should receive confirmation of Confirm.Select"""
self.start()


class TestConsumeCancel(AsyncTestCase):
class TestConsumeCancel(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Consume and cancel"

def begin(self, channel):
self.queue_name = str(uuid.uuid4())
Expand All @@ -66,12 +49,9 @@ def on_cancel(self, _frame):
def on_deleted(self, _frame):
self.stop()

def start_test(self):
"""SelectConnection should receive confirmation of Confirm.Select"""
self.start()


class TestExchangeDeclareAndDelete(AsyncTestCase):
class TestExchangeDeclareAndDelete(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Create and delete and exchange"

X_TYPE = 'direct'

Expand All @@ -91,12 +71,9 @@ def on_exchange_delete(self, frame):
self.assertIsInstance(frame.method, spec.Exchange.DeleteOk)
self.stop()

def start_test(self):
"""TornadoConnection should create and delete an exchange"""
self.start()


class TestExchangeRedeclareWithDifferentValues(AsyncTestCase):
class TestExchangeRedeclareWithDifferentValues(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "should close chan: re-declared queue w/ diff params"

X_TYPE1 = 'direct'
X_TYPE2 = 'topic'
Expand Down Expand Up @@ -128,14 +105,9 @@ def on_bad_result(self, frame):
self.channel.exchange_delete(None, self.name, nowait=True)
raise AssertionError("Should not have received a Queue.DeclareOk")

def start_test(self):
"""TornadoConnection should close chan: re-declared exchange w/ diff params
"""
self.start()


class TestQueueDeclareAndDelete(AsyncTestCase):
class TestQueueDeclareAndDelete(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Create and delete a queue"

def begin(self, channel):
channel.queue_declare(self.on_queue_declared,
Expand All @@ -154,12 +126,10 @@ def on_queue_delete(self, frame):
self.assertIsInstance(frame.method, spec.Queue.DeleteOk)
self.stop()

def start_test(self):
"""SelectConnection should create and delete a queue"""
self.start()


class TestQueueNameDeclareAndDelete(AsyncTestCase):
class TestQueueNameDeclareAndDelete(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Create and delete a named queue"

def begin(self, channel):
channel.queue_declare(self.on_queue_declared, str(id(self)),
Expand All @@ -179,12 +149,10 @@ def on_queue_delete(self, frame):
self.assertIsInstance(frame.method, spec.Queue.DeleteOk)
self.stop()

def start_test(self):
"""SelectConnection should create and delete a named queue"""
self.start()


class TestQueueRedeclareWithDifferentValues(AsyncTestCase):
class TestQueueRedeclareWithDifferentValues(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Should close chan: re-declared queue w/ diff params"

def begin(self, channel):
self.channel.add_on_close_callback(self.on_channel_closed)
Expand Down Expand Up @@ -212,14 +180,10 @@ def on_bad_result(self, frame):
self.channel.queue_delete(None, str(id(self)), nowait=True)
raise AssertionError("Should not have received a Queue.DeclareOk")

def start_test(self):
"""SelectConnection should close chan: re-declared queue w/ diff params

"""
self.start()


class TestTX1_Select(AsyncTestCase):
class TestTX1_Select(AsyncTestCase, AsyncAdapters):
DESCRIPTION="Receive confirmation of Tx.Select"

def begin(self, channel):
channel.tx_select(self.on_complete)
Expand All @@ -228,12 +192,10 @@ def on_complete(self, frame):
self.assertIsInstance(frame.method, spec.Tx.SelectOk)
self.stop()

def test_confirm_select(self):
"""SelectConnection should receive confirmation of Tx.Select"""
self.start()


class TestTX2_Commit(AsyncTestCase):
class TestTX2_Commit(AsyncTestCase, AsyncAdapters):
DESCRIPTION="Start a transaction, and commit it"

def begin(self, channel):
channel.tx_select(self.on_selectok)
Expand All @@ -246,12 +208,9 @@ def on_commitok(self, frame):
self.assertIsInstance(frame.method, spec.Tx.CommitOk)
self.stop()

def start_test(self):
"""SelectConnection should start a transaction, then commit it back"""
self.start()


class TestTX2_CommitFailure(AsyncTestCase):
class TestTX2_CommitFailure(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Close the channel: commit without a TX"

def begin(self, channel):
self.channel.add_on_close_callback(self.on_channel_closed)
Expand All @@ -266,12 +225,9 @@ def on_selectok(self, frame):
def on_commitok(self, frame):
raise AssertionError("Should not have received a Tx.CommitOk")

def start_test(self):
"""SelectConnection should close the channel: commit without a TX"""
self.start()


class TestTX3_Rollback(AsyncTestCase):
class TestTX3_Rollback(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Start a transaction, then rollback"

def begin(self, channel):
channel.tx_select(self.on_selectok)
Expand All @@ -284,12 +240,10 @@ def on_rollbackok(self, frame):
self.assertIsInstance(frame.method, spec.Tx.RollbackOk)
self.stop()

def start_test(self):
"""SelectConnection should start a transaction, then roll it back"""
self.start()


class TestTX3_RollbackFailure(AsyncTestCase):
class TestTX3_RollbackFailure(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Close the channel: rollback without a TX"

def begin(self, channel):
self.channel.add_on_close_callback(self.on_channel_closed)
Expand All @@ -301,12 +255,10 @@ def on_channel_closed(self, channel, reply_code, reply_text):
def on_commitok(self, frame):
raise AssertionError("Should not have received a Tx.RollbackOk")

def start_test(self):
"""SelectConnection should close the channel: rollback without a TX"""
self.start()


class TestZ_PublishAndConsume(BoundQueueTestCase):
class TestZ_PublishAndConsume(BoundQueueTestCase, AsyncAdapters):
DESCRIPTION = "Publish a message and consume it"

def on_ready(self, frame):
self.ctag = self.channel.basic_consume(self.on_message, self.queue)
Expand All @@ -324,12 +276,10 @@ def on_message(self, channel, method, header, body):
self.channel.basic_ack(method.delivery_tag)
self.channel.basic_cancel(self.on_cancelled, self.ctag)

def start_test(self):
"""SelectConnection should publish a message and consume it"""
self.start()


class TestZ_PublishAndConsumeBig(BoundQueueTestCase):
class TestZ_PublishAndConsumeBig(BoundQueueTestCase, AsyncAdapters):
DESCRIPTION = "Publish a big message and consume it"

def _get_msg_body(self):
return '\n'.join(["%s" % i for i in range(0, 2097152)])
Expand All @@ -350,12 +300,10 @@ def on_message(self, channel, method, header, body):
self.channel.basic_ack(method.delivery_tag)
self.channel.basic_cancel(self.on_cancelled, self.ctag)

def start_test(self):
"""SelectConnection should publish a big message and consume it"""
self.start()


class TestZ_PublishAndGet(BoundQueueTestCase):
class TestZ_PublishAndGet(BoundQueueTestCase, AsyncAdapters):
DESCRIPTION = "Publish a message and get it"

def on_ready(self, frame):
self.msg_body = "%s: %i" % (self.__class__.__name__, time.time())
Expand All @@ -368,7 +316,3 @@ def on_get(self, channel, method, header, body):
self.assertEqual(body, self.msg_body)
self.channel.basic_ack(method.delivery_tag)
self.stop()

def start_test(self):
"""SelectConnection should publish a message and get it"""
self.start()
82 changes: 76 additions & 6 deletions tests/acceptance/async_test_base.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
import select
import logging
try:
import unittest2 as unittest
except ImportError:
import unittest

import platform
target = platform.python_implementation()

import pika
from pika import adapters
from pika.adapters import select_connection

LOGGER = logging.getLogger(__name__)
PARAMETERS = pika.URLParameters('amqp://guest:guest@localhost:5672/%2f')
DEFAULT_TIMEOUT = 15


class AsyncTestCase(unittest.TestCase):

DESCRIPTION = ""
ADAPTER = None
TIMEOUT = DEFAULT_TIMEOUT


def shortDescription(self):
method_desc = super(AsyncTestCase, self).shortDescription()
if self.DESCRIPTION:
return "%s (%s)" % (self.DESCRIPTION, method_desc)
else:
return method_desc

def begin(self, channel):
"""Extend to start the actual tests on the channel"""
raise AssertionError("AsyncTestCase.begin_test not extended")

def start(self):
self.connection = self.ADAPTER(PARAMETERS, self.on_open,
def start(self, adapter=None):
self.adapter = adapter or self.ADAPTER

self.connection = self.adapter(PARAMETERS, self.on_open,
self.on_open_error, self.on_closed)
self.timeout = self.connection.add_timeout(self.TIMEOUT,
self.on_timeout)
Expand Down Expand Up @@ -68,14 +84,14 @@ class BoundQueueTestCase(AsyncTestCase):

def tearDown(self):
"""Cleanup auto-declared queue and exchange"""
self._cconn = self.ADAPTER(PARAMETERS, self._on_cconn_open,
self._cconn = self.adapter(PARAMETERS, self._on_cconn_open,
self._on_cconn_error, self._on_cconn_closed)

def start(self):
def start(self, adapter=None):
self.exchange = 'e' + str(id(self))
self.queue = 'q' + str(id(self))
self.routing_key = self.__class__.__name__
super(BoundQueueTestCase, self).start()
super(BoundQueueTestCase, self).start(adapter)

def begin(self, channel):
self.channel.exchange_declare(self.on_exchange_declared, self.exchange,
Expand Down Expand Up @@ -116,3 +132,57 @@ def _on_cconn_channel(self, channel):
channel.exchange_delete(None, self.exchange, nowait=True)
channel.queue_delete(None, self.queue, nowait=True)
self._cconn.close()

#
# In order to write test cases that will tested using all the Async Adapters
# write a class that inherits both from one of TestCase classes above and
# from the AsyncAdapters class below. This allows you to avoid duplicating the
# test methods for each adapter in each test class.
#

class AsyncAdapters(object):

def select_default_test(self):
"SelectConnection:DefaultPoller"
select_connection.POLLER_TYPE=None
self.start(adapters.SelectConnection)

def select_select_test(self):
"SelectConnection:select"
select_connection.POLLER_TYPE='select'
self.start(adapters.SelectConnection)

@unittest.skipIf(not hasattr(select, 'poll')
or not hasattr(select.poll(), 'modify'), "poll not supported")
def select_poll_test(self):
"SelectConnection:poll"
select_connection.POLLER_TYPE='poll'
self.start(adapters.SelectConnection)

@unittest.skipIf(not hasattr(select, 'epoll'), "epoll not supported")
def select_epoll_test(self):
"SelectConnection:epoll"
select_connection.POLLER_TYPE='epoll'
self.start(adapters.SelectConnection)

@unittest.skipIf(not hasattr(select, 'kqueue'), "kqueue not supported")
def select_kqueue_test(self):
"SelectConnection:kqueue"
select_connection.POLLER_TYPE='kqueue'
self.start(adapters.SelectConnection)

def tornado_test(self):
"TornadoConnection"
self.start(adapters.TornadoConnection)

def asyncore_test(self):
"AsyncoreConnection"
self.start(adapters.AsyncoreConnection)

@unittest.skipIf(target == 'PyPy', 'PyPy is not supported')
@unittest.skipIf(adapters.LibevConnection is None, 'pyev is not installed')
def libev_test(self):
"LibevConnection"
self.start(adapters.LibevConnection)


0 comments on commit 5e19543

Please sign in to comment.