Permalink
Browse files

Fix in-memory transport to specify poll interval

Speeds up tests dramatically.
  • Loading branch information...
1 parent ab10148 commit 7a9f4409f42aa82e31beffb1e832aaf48da924a2 @labisso labisso committed Feb 17, 2012
Showing with 49 additions and 20 deletions.
  1. +4 −2 dashi/__init__.py
  2. +10 −1 dashi/bootstrap/__init__.py
  3. +35 −17 dashi/tests/test_dashi.py
View
@@ -20,7 +20,8 @@ class DashiConnection(object):
#TODO support connection info instead of uri
- def __init__(self, name, uri, exchange, durable=False, auto_delete=True, serializer=None):
+ def __init__(self, name, uri, exchange, durable=False, auto_delete=True,
+ serializer=None, transport_options=None):
"""Set up a Dashi connection
@param name: name of destination service queue used by consumers
@@ -31,9 +32,10 @@ def __init__(self, name, uri, exchange, durable=False, auto_delete=True, seriali
@param auto_delete: if True, destination service queue and exchange
will be deleted when all consumers are gone
@param serializer: specify a serializer for message encoding
+ @param transport_options: custom parameter dict for the transport backend
"""
- self._conn = BrokerConnection(uri)
+ self._conn = BrokerConnection(uri, transport_options=transport_options)
self._name = name
self._exchange_name = exchange
self._exchange = Exchange(name=exchange, type='direct',
@@ -78,6 +78,14 @@ def dashi_connect(topic, CFG=None, amqp_uri=None):
CFG.server.amqp.host,
CFG.server.amqp.vhost,
)
+
+ # force small polling interval for in memory transport. This is only
+ # used in tests.
+ if amqp_uri.startswith('memory://'):
+ transport_options = dict(polling_interval=0.01)
+ else:
+ transport_options = None
+
try:
dashi_exchange = CFG.server.amqp.exchange
except AttributeError:
@@ -88,7 +96,8 @@ def dashi_connect(topic, CFG=None, amqp_uri=None):
except AttributeError:
serializer = None
- return DashiConnection(topic, amqp_uri, dashi_exchange, serializer=serializer)
+ return DashiConnection(topic, amqp_uri, dashi_exchange,
+ serializer=serializer, transport_options=transport_options)
def enable_gevent():
@@ -85,13 +85,16 @@ def cancel(self):
class DashiConnectionTests(unittest.TestCase):
uri = 'memory://hello'
+ transport_options = dict(polling_interval=0.01)
def test_fire(self):
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("test")
receiver.handle("test2")
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
args1 = dict(a=1, b="sandwich")
conn.fire(receiver.name, "test", **args1)
@@ -120,12 +123,14 @@ def test_fire(self):
self.assertEqual(gotargs, args3)
def test_call(self):
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
replies = [5,4,3,2,1]
receiver.handle("test", replies.pop)
receiver.consume_in_thread(1)
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
args1 = dict(a=1, b="sandwich")
ret = conn.call(receiver.name, "test", **args1)
@@ -141,11 +146,13 @@ def test_call(self):
receiver.join_consumer_thread()
def test_call_unknown_op(self):
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("test", True)
receiver.consume_in_thread(1)
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
try:
conn.call(receiver.name, "notarealop")
@@ -160,11 +167,13 @@ def test_call_handler_error(self):
def raise_hell():
raise Exception("hell")
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("raiser", raise_hell)
receiver.consume_in_thread(1)
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
try:
conn.call(receiver.name, "raiser")
@@ -182,14 +191,16 @@ def test_fire_many_receivers(self):
receiver_name = None
for i in range(3):
- receiver = TestReceiver(uri=self.uri, exchange="x1", **extras)
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options, **extras)
if not receiver_name:
receiver_name = receiver.name
extras['name'] = receiver.name
receiver.handle("test")
receivers.append(receiver)
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
for i in range(10):
conn.fire(receiver_name, "test", n=i)
@@ -204,7 +215,8 @@ def test_fire_many_receivers(self):
def test_cancel(self):
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("nothing", 1)
receiver.consume_in_thread(1)
@@ -214,11 +226,13 @@ def test_cancel(self):
receiver.join_consumer_thread()
def test_cancel_resume_cancel(self):
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("test", 1)
receiver.consume_in_thread()
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
self.assertEqual(1, conn.call(receiver.name, "test"))
receiver.cancel()
@@ -238,13 +252,15 @@ def test_cancel_resume_cancel(self):
receiver.join_consumer_thread()
def test_handle_sender_kwarg(self):
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("test1", "hello", sender_kwarg="sender")
receiver.handle("test2", "hi", sender_kwarg="spender")
receiver.consume_in_thread()
sender_name = uuid.uuid4().hex
- conn = dashi.DashiConnection(sender_name, self.uri, "x1")
+ conn = dashi.DashiConnection(sender_name, self.uri, "x1",
+ transport_options=self.transport_options)
args = dict(a=1, b="sandwich")
expected_args1 = args.copy()
@@ -285,11 +301,13 @@ def test_call_channel_free(self):
# hackily ensure that call() releases its channel
- receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver = TestReceiver(uri=self.uri, exchange="x1",
+ transport_options=self.transport_options)
receiver.handle("test", "myreply")
receiver.consume_in_thread(1)
- conn = dashi.DashiConnection("s1", self.uri, "x1")
+ conn = dashi.DashiConnection("s1", self.uri, "x1",
+ transport_options=self.transport_options)
# peek into connection to grab a channel and note its id
with connections[conn._conn].acquire(block=True) as kombuconn:

0 comments on commit 7a9f440

Please sign in to comment.