Permalink
Browse files

Merge branch 'master' of github.com:nimbusproject/dashi

  • Loading branch information...
2 parents 752ffa9 + 9b2377b commit 5fc58b5d0e4d93cc6406b51ad3b20525382bb926 @buzztroll buzztroll committed Dec 15, 2011
Showing with 37 additions and 11 deletions.
  1. +4 −7 dashi/__init__.py
  2. +33 −4 dashi/tests/test_dashi.py
View
@@ -72,11 +72,8 @@ def call(self, name, operation, timeout=5, args=None, **kwargs):
# check out a connection from the pool
with connections[self._conn].acquire(block=True) as conn:
- channel = conn.channel()
- queue = Queue(channel=channel, name=msg_id, exchange=exchange,
- routing_key=msg_id, exclusive=True, durable=False,
- auto_delete=True)
- queue.declare()
+ queue = Queue(name=msg_id, exchange=exchange, routing_key=msg_id,
+ exclusive=True, durable=False, auto_delete=True)
log.debug("declared call() reply queue %s", msg_id)
messages = []
@@ -85,8 +82,8 @@ def _callback(body, message):
messages.append(body)
message.ack()
- consumer = Consumer(channel=channel, queues=[queue],
- callbacks=[_callback])
+ consumer = Consumer(conn, queues=(queue,), callbacks=(_callback,))
+ consumer.declare()
d = dict(op=operation, args=args)
headers = {'reply-to' : msg_id}
View
@@ -3,12 +3,11 @@
from functools import partial
import itertools
import uuid
-import time
-import dashi
+from kombu.pools import connections
+import dashi
import dashi.util
-
from dashi.tests.util import who_is_calling
log = dashi.util.get_logger()
@@ -204,4 +203,34 @@ def test_cancel(self):
class RabbitDashiConnectionTests(DashiConnectionTests):
- uri = "amqp://guest:guest@127.0.0.1//"
+ """The base dashi tests run on rabbit, plus some extras which are
+ rabbit specific
+ """
+ uri = "amqp://guest:guest@127.0.0.1//"
+
+ def test_call_channel_free(self):
+
+ # hackily ensure that call() releases its channel
+
+ receiver = TestReceiver(uri=self.uri, exchange="x1")
+ receiver.handle("test", "myreply")
+ receiver.consume_in_thread(1)
+
+ conn = dashi.DashiConnection("s1", self.uri, "x1")
+
+ # peek into connection to grab a channel and note its id
+ with connections[conn._conn].acquire(block=True) as kombuconn:
+ with kombuconn.channel() as channel:
+ channel_id = channel.channel_id
+ log.debug("got channel ID %s", channel.channel_id)
+
+ ret = conn.call(receiver.name, "test")
+ self.assertEqual(ret, "myreply")
+ receiver.join_consumer_thread()
+
+ # peek into connection to grab a channel and note its id
+ with connections[conn._conn].acquire(block=True) as kombuconn:
+ with kombuconn.channel() as channel:
+ log.debug("got channel ID %s", channel.channel_id)
+ self.assertEqual(channel_id, channel.channel_id)
+

0 comments on commit 5fc58b5

Please sign in to comment.