Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
Delete call RPC queue after use
Browse files Browse the repository at this point in the history
  • Loading branch information
labisso committed Feb 28, 2013
1 parent 6f42c9a commit ed1d303
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
10 changes: 8 additions & 2 deletions dashi/__init__.py
Expand Up @@ -168,7 +168,6 @@ def call(self, name, operation, timeout=10, args=None, **kwargs):
def _callback(body, message): def _callback(body, message):
messages.append(body) messages.append(body)
message.ack() message.ack()
log.debug("setting event")
event.set() event.set()


d = dict(op=operation, args=args) d = dict(op=operation, args=args)
Expand All @@ -187,8 +186,15 @@ def _declare_and_send(channel):
consumer, channel = self.ensure(conn, _declare_and_send) consumer, channel = self.ensure(conn, _declare_and_send)
try: try:
self._consume(conn, consumer, timeout=timeout, until_event=event) self._consume(conn, consumer, timeout=timeout, until_event=event)

# try to delete queue, but don't worry if it fails (will expire)
try:
queue = queue.bind(channel)
queue.delete(nowait=True)
except Exception:
log.exception("error deleting queue")

finally: finally:
pass
conn.maybe_close_channel(channel) conn.maybe_close_channel(channel)


msg_body = messages[0] msg_body = messages[0]
Expand Down
44 changes: 41 additions & 3 deletions dashi/tests/test_dashi.py
Expand Up @@ -9,10 +9,12 @@


from kombu.pools import connections from kombu.pools import connections
import kombu.pools import kombu.pools
from mock import patch


import dashi import dashi
import dashi.util import dashi.util
from dashi.tests.util import who_is_calling, SocatProxy from dashi.tests.util import who_is_calling, SocatProxy, get_queue_info
from dashi.exceptions import NotFoundError


log = logging.getLogger(__name__) log = logging.getLogger(__name__)


Expand Down Expand Up @@ -51,7 +53,7 @@ def assert_kombu_pools_empty():


class TestReceiver(object): class TestReceiver(object):


consume_timeout = 30 consume_timeout = 300


def __init__(self, **kwargs): def __init__(self, **kwargs):


Expand Down Expand Up @@ -135,6 +137,10 @@ def cancel(self):
def disconnect(self): def disconnect(self):
self.conn.disconnect() self.conn.disconnect()


@property
def queue(self):
return self.conn._consumer._queue



class DashiConnectionTests(unittest.TestCase): class DashiConnectionTests(unittest.TestCase):


Expand Down Expand Up @@ -434,6 +440,36 @@ def test_call_timeout(self):
delta = countdown.delta_seconds delta = countdown.delta_seconds
assert 0 < delta < 1, "delta: %s" % delta assert 0 < delta < 1, "delta: %s" % delta


def test_call_queue_deleted(self):
receiver = TestReceiver(uri=self.uri, exchange="x1",
transport_options=self.transport_options)
receiver.handle("test", "hats")
receiver.consume_in_thread(1)

conn = dashi.DashiConnection("s1", self.uri, "x1",
transport_options=self.transport_options)
args1 = dict(a=1, b="sandwich")

rpc_consumers = []

# patch in this fake consumer so we can save a copy
class _Consumer(dashi.Consumer):
def __init__(self, *args, **kwargs):
super(_Consumer, self).__init__(*args, **kwargs)
rpc_consumers.append(self)

with patch.object(dashi, "Consumer", new=_Consumer):
self.assertEqual(conn.call(receiver.name, "test", **args1), "hats")

self.assertEqual(len(rpc_consumers), 1)
self.assertEqual(len(rpc_consumers[0].queues), 1)
queue = rpc_consumers[0].queues[0]

self.assertRaises(NotFoundError, get_queue_info, conn, queue)

receiver.join_consumer_thread()
assert_kombu_pools_empty()



class RabbitDashiConnectionTests(DashiConnectionTests): class RabbitDashiConnectionTests(DashiConnectionTests):
"""The base dashi tests run on rabbit, plus some extras which are """The base dashi tests run on rabbit, plus some extras which are
Expand Down Expand Up @@ -644,7 +680,9 @@ def errback():


self.proxy.restart() self.proxy.restart()


self.assertEqual(conn.call(receiver.name, "test"), "hats") log.debug("Queue consumer count: %s", get_queue_info(conn, receiver.queue)[2])

self.assertEqual(conn.call(receiver.name, "test", timeout=60), "hats")
self.assertEqual(conn.call(receiver.name, "test"), "hats") self.assertEqual(conn.call(receiver.name, "test"), "hats")


receiver.cancel() receiver.cancel()
Expand Down
1 change: 1 addition & 0 deletions dashi/tests/test_util.py
Expand Up @@ -3,6 +3,7 @@


from dashi.util import LoopingCall from dashi.util import LoopingCall



class LoopingCallTests(unittest.TestCase): class LoopingCallTests(unittest.TestCase):


def setUp(self): def setUp(self):
Expand Down
22 changes: 22 additions & 0 deletions dashi/tests/util.py
Expand Up @@ -7,6 +7,11 @@
import socket import socket
import logging import logging


from kombu.messaging import Queue
from kombu.pools import connections

from dashi.exceptions import NotFoundError

log = logging.getLogger(__name__) log = logging.getLogger(__name__)




Expand Down Expand Up @@ -76,3 +81,20 @@ def free_port(host="localhost"):
return sock.getsockname()[1] return sock.getsockname()[1]
finally: finally:
sock.close() sock.close()


def get_queue_info(connection, queue):
"""Returns queue name, message count, consumer count
"""
with connections[connection._pool_conn].acquire(block=True) as conn:
q = Queue(queue.name, channel=conn, exchange=queue.exchange,
durable=queue.durable, auto_delete=queue.auto_delete)
# doesn't actually declare queue, just checks if it exists
try:
return q.queue_declare(passive=True)
except Exception as e:
# better way to check this?
if "NOT_FOUND" in str(e):
raise NotFoundError()
raise

0 comments on commit ed1d303

Please sign in to comment.