Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

98% coverage for kombu.compat

  • Loading branch information...
commit caf0069cda6ad6a07ba43a9d4530c187390f684c 1 parent f58b857
Ask Solem authored
View
32 kombu/compat.py
@@ -26,15 +26,27 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None):
def entry_to_queue(queue, **options):
binding_key = options.get("binding_key") or options.get("routing_key")
- e_durable = options.get("exchange_durable") or options.get("durable")
- e_auto_delete = options.get("exchange_auto_delete") or \
- options.get("auto_delete")
- q_durable = options.get("queue_durable") or options.get("durable")
- q_auto_delete = options.get("queue_auto_delete") or \
- options.get("auto_delete")
+
+ e_durable = options.get("exchange_durable")
+ if e_durable is None:
+ e_durable = options.get("durable")
+
+ e_auto_delete = options.get("exchange_auto_delete")
+ if e_auto_delete is None:
+ e_auto_delete = options.get("auto_delete")
+
+ q_durable = options.get("queue_durable")
+ if q_durable is None:
+ q_durable = options.get("durable")
+
+ q_auto_delete = options.get("queue_auto_delete")
+ if q_auto_delete is None:
+ q_auto_delete = options.get("auto_delete")
+
e_arguments = options.get("exchange_arguments")
q_arguments = options.get("queue_arguments")
b_arguments = options.get("binding_arguments")
+
exchange = entity.Exchange(options.get("exchange"),
type=options.get("exchange_type"),
delivery_mode=options.get("delivery_mode"),
@@ -42,6 +54,7 @@ def entry_to_queue(queue, **options):
durable=e_durable,
auto_delete=e_auto_delete,
arguments=e_arguments)
+
return entity.Queue(queue,
exchange=exchange,
routing_key=binding_key,
@@ -198,7 +211,7 @@ def __init__(self, connection, from_dict=None, consumers=None,
queues = []
if consumers:
for consumer in consumers:
- map(queues.extend, consumer.queues)
+ queues.extend(consumer.queues)
if from_dict:
for queue_name, queue_options in from_dict.items():
queues.append(entry_to_queue(queue_name, **queue_options))
@@ -213,11 +226,12 @@ def discard_all(self):
def add_consumer_from_dict(self, queue, **options):
queue = entry_to_queue(queue, **options)
- self.queues.append(queue)
+ self.queues.append(queue(self.channel))
return queue
def add_consumer(self, consumer):
- self.queues.extend(consumer.queues)
+ for queue in consumer.queues:
+ self.queues.append(queue(self.channel))
def close(self):
self.cancel()
View
9 kombu/tests/mocks.py
@@ -24,6 +24,7 @@ class Channel(object):
def __init__(self):
self.called = []
self.deliveries = count(1).next
+ self.to_deliver = []
def _called(self, name):
self.called.append(name)
@@ -66,8 +67,13 @@ def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs):
def basic_get(self, *args, **kwargs):
self._called("basic_get")
+ try:
+ return self.to_deliver.pop()
+ except IndexError:
+ pass
def queue_purge(self, *args, **kwargs):
+ print("PURGE!")
self._called("queue_purge")
def basic_consume(self, *args, **kwargs):
@@ -82,6 +88,9 @@ def basic_ack(self, *args, **kwargs):
def basic_recover(self, requeue=False):
self._called("basic_recover")
+ def close(self):
+ self._called("close")
+
def message_to_python(self, message, *args, **kwargs):
self._called("message_to_python")
return Message(self, body=simplejson.dumps(message),
View
283 kombu/tests/test_compat.py
@@ -0,0 +1,283 @@
+import unittest2 as unittest
+
+from kombu import BrokerConnection, Exchange, Queue
+from kombu import compat
+
+from kombu.tests.mocks import Transport, Channel
+
+class test_misc(unittest.TestCase):
+
+ def test_iterconsume(self):
+ class Connection(object):
+ drained = 0
+ def drain_events(self, *args, **kwargs):
+ self.drained += 1
+ return self.drained
+
+ class Consumer(object):
+ active = False
+
+ def consume(self, *args, **kwargs):
+ self.active = True
+
+ conn = Connection()
+ consumer = Consumer()
+ it = compat._iterconsume(conn, consumer)
+ self.assertEqual(it.next(), 1)
+ self.assertTrue(consumer.active)
+
+ it2 = compat._iterconsume(conn, consumer, limit=10)
+ self.assertEqual(list(it2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
+
+ def test_entry_to_queue(self):
+ defs = {"binding_key": "foo.#",
+ "exchange": "fooex",
+ "exchange_type": "topic",
+ "durable": True,
+ "auto_delete": False}
+
+ q1 = compat.entry_to_queue("foo", **dict(defs))
+ self.assertEqual(q1.name, "foo")
+ self.assertEqual(q1.routing_key, "foo.#")
+ self.assertEqual(q1.exchange.name, "fooex")
+ self.assertEqual(q1.exchange.type, "topic")
+ self.assertTrue(q1.durable)
+ self.assertTrue(q1.exchange.durable)
+ self.assertFalse(q1.auto_delete)
+ self.assertFalse(q1.exchange.auto_delete)
+
+ q2 = compat.entry_to_queue("foo", **dict(defs,
+ exchange_durable=False))
+ self.assertTrue(q2.durable)
+ self.assertFalse(q2.exchange.durable)
+
+ q3 = compat.entry_to_queue("foo", **dict(defs,
+ exchange_auto_delete=True))
+ self.assertFalse(q3.auto_delete)
+ self.assertTrue(q3.exchange.auto_delete)
+
+ q4 = compat.entry_to_queue("foo", **dict(defs,
+ queue_durable=False))
+ self.assertFalse(q4.durable)
+ self.assertTrue(q4.exchange.durable)
+
+ q5 = compat.entry_to_queue("foo", **dict(defs,
+ queue_auto_delete=True))
+ self.assertTrue(q5.auto_delete)
+ self.assertFalse(q5.exchange.auto_delete)
+
+ self.assertEqual(compat.entry_to_queue("foo", **dict(defs)),
+ compat.entry_to_queue("foo", **dict(defs)))
+
+
+class test_Publisher(unittest.TestCase):
+
+ def setUp(self):
+ self.connection = BrokerConnection(transport=Transport)
+
+ def test_constructor(self):
+ pub = compat.Publisher(self.connection,
+ exchange="test_Publisher_constructor",
+ routing_key="rkey")
+ self.assertIsInstance(pub.backend, Channel)
+ self.assertEqual(pub.exchange.name, "test_Publisher_constructor")
+ self.assertTrue(pub.exchange.durable)
+ self.assertFalse(pub.exchange.auto_delete)
+ self.assertEqual(pub.exchange.type, "direct")
+
+ pub2 = compat.Publisher(self.connection,
+ exchange="test_Publisher_constructor2",
+ routing_key="rkey",
+ auto_delete=True,
+ durable=False)
+ self.assertTrue(pub2.exchange.auto_delete)
+ self.assertFalse(pub2.exchange.durable)
+
+ explicit = Exchange("test_Publisher_constructor_explicit",
+ type="topic")
+ pub3 = compat.Publisher(self.connection,
+ exchange=explicit)
+ self.assertEqual(pub3.exchange, explicit)
+
+ def test_send(self):
+ pub = compat.Publisher(self.connection,
+ exchange="test_Publisher_send",
+ routing_key="rkey")
+ pub.send({"foo": "bar"})
+ self.assertIn("basic_publish", pub.backend)
+ pub.close()
+ self.assertIn("close", pub.backend)
+
+ def test__enter__exit__(self):
+ pub = compat.Publisher(self.connection,
+ exchange="test_Publisher_send",
+ routing_key="rkey")
+ x = pub.__enter__()
+ self.assertIs(x, pub)
+ x.__exit__()
+ self.assertIn("close", pub.backend)
+ self.assertTrue(pub._closed)
+
+
+class test_Consumer(unittest.TestCase):
+
+ def setUp(self):
+ self.connection = BrokerConnection(transport=Transport)
+
+ def test_constructor(self, n="test_Consumer_constructor"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertIsInstance(c.backend, Channel)
+ q = c.queues[0]
+ self.assertTrue(q.durable)
+ self.assertTrue(q.exchange.durable)
+ self.assertFalse(q.auto_delete)
+ self.assertFalse(q.exchange.auto_delete)
+ self.assertEqual(q.name, n)
+ self.assertEqual(q.exchange.name, n)
+
+ c2 = compat.Consumer(self.connection, queue=n+"2", exchange=n+"2",
+ routing_key="rkey", durable=False,
+ auto_delete=True, exclusive=True)
+ q2 = c2.queues[0]
+ self.assertFalse(q2.durable)
+ self.assertFalse(q2.exchange.durable)
+ self.assertTrue(q2.auto_delete)
+ self.assertTrue(q2.exchange.auto_delete)
+
+ def test__enter__exit__(self, n="test__enter__exit__"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ x = c.__enter__()
+ self.assertIs(x, c)
+ x.__exit__()
+ self.assertIn("close", c.backend)
+ self.assertTrue(c._closed)
+
+ def test_iterqueue(self, n="test_iterqueue"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertTrue(hasattr(c.__iter__(), "next"))
+ c.close()
+
+ def test_process_next(self, n="test_process_next"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertRaises(NotImplementedError, c.process_next)
+ c.close()
+
+ def test_iterconsume(self, n="test_iterconsume"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertTrue(hasattr(c.iterconsume(), "next"))
+ c.close()
+
+ def test_discard_all(self, n="test_discard_all"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ c.discard_all()
+ self.assertIn("queue_purge", c.backend)
+
+ def test_fetch(self, n="test_fetch"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertIsNone(c.fetch())
+ self.assertIsNone(c.fetch(no_ack=True))
+ self.assertIn("basic_get", c.backend)
+
+ callback_called = [False]
+
+ def receive(payload, message):
+ callback_called[0] = True
+
+ c.backend.to_deliver.append("42")
+ self.assertEqual(c.fetch().payload, "42")
+ c.backend.to_deliver.append("46")
+ c.register_callback(receive)
+ self.assertEqual(c.fetch(enable_callbacks=True).payload, "46")
+ self.assertTrue(callback_called[0])
+
+ def test_discard_all_filterfunc_not_supported(self, n="xjf21j21"):
+ c = compat.Consumer(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertRaises(NotImplementedError, c.discard_all,
+ filterfunc=lambda x: x)
+ c.close()
+
+ def test_wait(self, n="test_wait"):
+
+ class C(compat.Consumer):
+
+ def iterconsume(self, limit=None):
+ for i in range(limit):
+ yield i
+
+
+ c = C(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertEqual(c.wait(10), range(10))
+ c.close()
+
+ def test_iterqueue(self, n="test_iterqueue"):
+
+ i = [0]
+
+ class C(compat.Consumer):
+
+ def fetch(self, limit=None):
+ z = i[0]
+ i[0] += 1
+ return z
+
+ c = C(self.connection, queue=n, exchange=n,
+ routing_key="rkey")
+ self.assertEqual(list(c.iterqueue(limit=10)), range(10))
+ c.close()
+
+
+class test_ConsumerSet(unittest.TestCase):
+
+ def setUp(self):
+ self.connection = BrokerConnection(transport=Transport)
+
+ def test_constructor(self, prefix="0daf8h21"):
+ dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix,
+ "routing_key": "xyx"},
+ "%s.xyz" % prefix: {"exchange": "%s.xyz" % prefix,
+ "routing_key": "xyz"}}
+ consumers = [compat.Consumer(self.connection, queue=prefix + str(i),
+ exchange=prefix + str(i))
+ for i in range(3)]
+ c = compat.ConsumerSet(self.connection, consumers=consumers)
+ c2 = compat.ConsumerSet(self.connection, from_dict=dcon)
+
+ self.assertEqual(len(c.queues), 3)
+ self.assertEqual(len(c2.queues), 2)
+
+ self.assertTrue(hasattr(c.iterconsume(), "next"))
+
+ c.add_consumer(compat.Consumer(self.connection,
+ queue=prefix + "xaxxxa",
+ exchange=prefix + "xaxxxa"))
+ self.assertEqual(len(c.queues), 4)
+ for cq in c.queues:
+ self.assertIs(cq.channel, c.channel)
+
+ c2.add_consumer_from_dict({"%s.xxx" % prefix: {
+ "exchange": "%s.xxx" % prefix,
+ "routing_key": "xxx"}})
+ self.assertEqual(len(c2.queues), 3)
+ for c2q in c2.queues:
+ self.assertIs(c2q.channel, c2.channel)
+
+ c.discard_all()
+ print("CALLED: %r" % (c.channel.called, ))
+ self.assertEqual(c.channel.called.count("queue_purge"), 4)
+ c.consume()
+
+ c.close()
+ c2.close()
+ self.assertIn("basic_cancel", c.channel)
+ self.assertIn("close", c.channel)
+ self.assertIn("close", c2.channel)
+
View
1  setup.cfg
@@ -13,7 +13,6 @@ cover3-exclude = kombu
kombu.transport.pycouchdb
kombu.transport.mongodb
kombu.transport.beanstalk
- kombu.compat
[build_sphinx]
source-dir = docs/
Please sign in to comment.
Something went wrong with that request. Please try again.