Skip to content

Commit

Permalink
Fixing amqp tests by taking care of routing_key/queue names and bindi…
Browse files Browse the repository at this point in the history
…ngs to avoid collisions between tests on same queue/binding.
  • Loading branch information
farirat committed Feb 23, 2022
1 parent 51bf39f commit d4ec696
Showing 1 changed file with 64 additions and 43 deletions.
107 changes: 64 additions & 43 deletions tests/queues/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def test_publish_to_fanout_exchange(self):
def test_publish_to_queue(self):
yield self.connect()

yield self.amqp.named_queue_declare(queue="submit.sm.connector01")
yield self.amqp.named_queue_declare(queue="submit.sm.test_publish_to_queue")

yield self.amqp.publish(routing_key="submit.sm.connector01", content=Content(self.message))
yield self.amqp.publish(routing_key="submit.sm.test_publish_to_queue", content=Content(self.message))

yield self.amqp.disconnect()

Expand Down Expand Up @@ -152,9 +152,9 @@ class ConsumeTestCase(ConsumeTools):
def test_consume_queue(self):
yield self.connect()

yield self.amqp.named_queue_declare(queue="submit.sm.connector01")
yield self.amqp.named_queue_declare(queue="submit.sm.test_consume_queue")

yield self.amqp.chan.basic_consume(queue="submit.sm.connector01", no_ack=True, consumer_tag='qtag')
yield self.amqp.chan.basic_consume(queue="submit.sm.test_consume_queue", no_ack=True, consumer_tag='qtag')
self.queue = yield self.amqp.client.queue('qtag')
self.queue.get().addCallback(self._callback, self.queue).addErrback(self._errback)

Expand All @@ -170,15 +170,16 @@ class PublishConsumeTestCase(ConsumeTools):
def test_simple_publish_consume(self):
yield self.connect()

yield self.amqp.named_queue_declare(queue="submit.sm.connector01")
yield self.amqp.named_queue_declare(queue="submit.sm.test_simple_publish_consume")

# Consume
yield self.amqp.chan.basic_consume(queue="submit.sm.connector01", no_ack=True, consumer_tag='qtag')
yield self.amqp.chan.basic_consume(queue="submit.sm.test_simple_publish_consume",
no_ack=True, consumer_tag='qtag')
queue = yield self.amqp.client.queue('qtag')
queue.get().addCallback(self._callback, queue).addErrback(self._errback)

# Publish
yield self.amqp.publish(routing_key="submit.sm.connector01", content=Content(self.message))
yield self.amqp.publish(routing_key="submit.sm.test_simple_publish_consume", content=Content(self.message))

# Wait for 2 seconds
# (give some time to the consumer to get its work done)
Expand All @@ -197,15 +198,19 @@ def test_simple_publish_consume_by_topic(self):
yield self.amqp.chan.exchange_declare(exchange='messaging', type='topic')

# Consume
yield self.amqp.named_queue_declare(queue="submit.sm_all")
yield self.amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*")
yield self.amqp.chan.basic_consume(queue="submit.sm_all", no_ack=True, consumer_tag='qtag')
yield self.amqp.named_queue_declare(queue="submit.test_simple_publish_consume_by_topic")
yield self.amqp.chan.queue_bind(
queue="submit.test_simple_publish_consume_by_topic", exchange="messaging",
routing_key="submit.sm.test_simple_publish_consume_by_topic")
yield self.amqp.chan.basic_consume(queue="submit.test_simple_publish_consume_by_topic",
no_ack=True, consumer_tag='qtag')
queue = yield self.amqp.client.queue('qtag')
queue.get().addCallback(self._callback, queue).addErrback(self._errback)

# Publish
yield self.amqp.publish(exchange='messaging', routing_key="submit.sm.connector01",
content=Content(self.message))
yield self.amqp.publish(
exchange='messaging', routing_key="submit.sm.test_simple_publish_consume_by_topic",
content=Content(self.message))

# Wait for 2 seconds
# (give some time to the consumer to get its work done)
Expand All @@ -221,22 +226,24 @@ def test_simple_publish_consume_by_topic(self):
def test_publish_consume_from_different_queues(self):
yield self.connect()

yield self.amqp.named_queue_declare(queue="submit.sm.connector01")
yield self.amqp.named_queue_declare(queue="deliver.sm.connector01")
yield self.amqp.named_queue_declare(queue="submit.sm.test_publish_consume_from_different_queues")
yield self.amqp.named_queue_declare(queue="deliver.sm.test_publish_consume_from_different_queues")

# Consume
yield self.amqp.chan.basic_consume(queue="submit.sm.connector01", no_ack=True,
consumer_tag='submit_sm_consumer')
yield self.amqp.chan.basic_consume(queue="deliver.sm.connector01", no_ack=True,
consumer_tag='deliver_sm_consumer')
yield self.amqp.chan.basic_consume(queue="submit.sm.test_publish_consume_from_different_queues",
no_ack=True, consumer_tag='submit_sm_consumer')
yield self.amqp.chan.basic_consume(queue="deliver.sm.test_publish_consume_from_different_queues",
no_ack=True, consumer_tag='deliver_sm_consumer')
self.submit_sm_q = yield self.amqp.client.queue('submit_sm_consumer')
self.deliver_sm_q = yield self.amqp.client.queue('deliver_sm_consumer')
self.submit_sm_q.get().addCallback(self._callback, self.submit_sm_q).addErrback(self._errback)
self.deliver_sm_q.get().addCallback(self._callback, self.deliver_sm_q).addErrback(self._errback)

# Publish
yield self.amqp.publish(routing_key="submit.sm.connector01", content=Content(self.message))
yield self.amqp.publish(routing_key="deliver.sm.connector01", content=Content(self.message))
yield self.amqp.publish(
routing_key="submit.sm.test_publish_consume_from_different_queues", content=Content(self.message))
yield self.amqp.publish(
routing_key="deliver.sm.test_publish_consume_from_different_queues", content=Content(self.message))

# Wait for 2 seconds
# (give some time to the consumer to get its work done)
Expand All @@ -259,15 +266,18 @@ def test_start_consuming_later(self):

# Consume
consumerTag = 'lateConsumerTest-%s' % (str(uuid.uuid4()))
yield self.amqp.named_queue_declare(queue="submit.sm.connector01")
yield self.amqp.chan.queue_bind(queue="submit.sm.connector01", exchange="messaging", routing_key="submit.sm.*")
yield self.amqp.chan.basic_consume(queue="submit.sm.connector01", no_ack=False, consumer_tag=consumerTag)
yield self.amqp.named_queue_declare(queue="submit.sm.test_start_consuming_later")
yield self.amqp.chan.queue_bind(
queue="submit.sm.test_start_consuming_later", exchange="messaging",
routing_key="submit.sm.test_start_consuming_later")
yield self.amqp.chan.basic_consume(
queue="submit.sm.test_start_consuming_later", no_ack=False, consumer_tag=consumerTag)
queue = yield self.amqp.client.queue(consumerTag)

# Publish
for i in range(5000):
yield self.amqp.publish(exchange='messaging', routing_key="submit.sm.connector01",
content=Content(str(i)))
yield self.amqp.publish(
exchange='messaging', routing_key="submit.sm.test_start_consuming_later", content=Content(str(i)))

# Start consuming (same as starting a connector)
queue.get().addCallback(self._callback, queue, ack=True).addErrback(self._errback)
Expand All @@ -292,15 +302,19 @@ def test_publish_pickled_binary_content(self):
yield self.amqp.chan.exchange_declare(exchange='messaging', type='topic')

# Consume
yield self.amqp.named_queue_declare(queue="submit.sm_all")
yield self.amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*")
yield self.amqp.chan.basic_consume(queue="submit.sm_all", no_ack=True, consumer_tag='qtag')
yield self.amqp.named_queue_declare(queue="submit.test_publish_pickled_binary_content")
yield self.amqp.chan.queue_bind(
queue="submit.test_publish_pickled_binary_content", exchange="messaging",
routing_key="submit.sm.test_publish_pickled_binary_content")
yield self.amqp.chan.basic_consume(
queue="submit.test_publish_pickled_binary_content", no_ack=True, consumer_tag='qtag')
queue = yield self.amqp.client.queue('qtag')
queue.get().addCallback(self._callback, queue).addErrback(self._errback)

# Publish a pickled binary content with v2 protocol
yield self.amqp.publish(exchange='messaging', routing_key="submit.sm.connector01",
content=Content(pickle.dumps('\x53', pickle.HIGHEST_PROTOCOL)))
yield self.amqp.publish(
exchange='messaging', routing_key="submit.sm.test_publish_pickled_binary_content",
content=Content(pickle.dumps('\x53', pickle.HIGHEST_PROTOCOL)))

# Wait for 2 seconds
# (give some time to the consumer to get its work done)
Expand Down Expand Up @@ -346,22 +360,25 @@ def test_consume_all_requeued_messages(self):
yield self.amqp.chan.exchange_declare(exchange='messaging', type='topic')

# Consume
yield self.amqp.named_queue_declare(queue="submit.sm_all_1")
yield self.amqp.chan.queue_bind(queue="submit.sm_all_1", exchange="messaging", routing_key="submit.sm.*")
yield self.amqp.chan.basic_consume(queue="submit.sm_all_1", no_ack=False, consumer_tag='qtag')
yield self.amqp.named_queue_declare(queue="submit.test_consume_all_requeued_messages")
yield self.amqp.chan.queue_bind(
queue="submit.test_consume_all_requeued_messages", exchange="messaging",
routing_key="submit.sm.test_consume_all_requeued_messages")
yield self.amqp.chan.basic_consume(
queue="submit.test_consume_all_requeued_messages", no_ack=False, consumer_tag='qtag')
queue = yield self.amqp.client.queue('qtag')
queue.get().addCallback(self._callback_reject_once, queue, reject=True).addErrback(self._errback)

# Publish
for i in range(50):
yield self.amqp.publish(exchange='messaging', routing_key="submit.sm.connector01",
content=Content(str(i)))
yield self.amqp.publish(
exchange='messaging', routing_key="submit.sm.test_consume_all_requeued_messages",
content=Content(str(i)))

# Wait for 2 seconds
# (give some time to the consumer to get its work done)
yield waitFor(2)

yield self.amqp.chan.queue_unbind(queue="submit.sm_all_1", exchange="messaging", routing_key="submit.sm.*")
yield queue.close()
yield self.amqp.disconnect()

Expand All @@ -376,17 +393,21 @@ def test_requeue_all_restart_then_reconsume(self):
yield self.amqp.chan.exchange_declare(exchange='messaging', type='topic')

# Setup Consumer
yield self.amqp.named_queue_declare(queue="submit.sm_all_2")
yield self.amqp.chan.queue_bind(queue="submit.sm_all_2", exchange="messaging", routing_key="submit.sm.*")
yield self.amqp.chan.basic_consume(queue="submit.sm_all_2", no_ack=False, consumer_tag='qtag')
yield self.amqp.named_queue_declare(queue="submit.test_requeue_all_restart_then_reconsume")
yield self.amqp.chan.queue_bind(
queue="submit.test_requeue_all_restart_then_reconsume", exchange="messaging",
routing_key="submit.sm.test_requeue_all_restart_then_reconsume")
yield self.amqp.chan.basic_consume(
queue="submit.test_requeue_all_restart_then_reconsume", no_ack=False, consumer_tag='qtag')
queue = yield self.amqp.client.queue('qtag')
# Start consuming through _callback_reject_and_requeue_all
queue.get().addCallback(self._callback_reject_and_requeue_all, queue).addErrback(self._errback)

# Publish
for i in range(50):
yield self.amqp.publish(exchange='messaging', routing_key="submit.sm.connector01",
content=Content(str(i)))
yield self.amqp.publish(
exchange='messaging', routing_key="submit.sm.test_requeue_all_restart_then_reconsume",
content=Content(str(i)))

# Wait for 2 seconds
# (give some time to the consumer to get its work done)
Expand All @@ -402,7 +423,8 @@ def test_requeue_all_restart_then_reconsume(self):
yield waitFor(2)

# Start consuming again
yield self.amqp.chan.basic_consume(queue="submit.sm_all_2", no_ack=False, consumer_tag='qtag')
yield self.amqp.chan.basic_consume(
queue="submit.test_requeue_all_restart_then_reconsume", no_ack=False, consumer_tag='qtag')
queue = yield self.amqp.client.queue('qtag')
# Consuming through _callback
queue.get().addCallback(self._callback, queue, ack=True).addErrback(self._errback)
Expand All @@ -412,7 +434,6 @@ def test_requeue_all_restart_then_reconsume(self):
yield waitFor(2)

# Stop consuming and assert
yield self.amqp.chan.queue_unbind(queue="submit.sm_all_2", exchange="messaging", routing_key="submit.sm.*")
yield queue.close()
self.assertEqual(self.consumedMessages, 50)

Expand Down

0 comments on commit d4ec696

Please sign in to comment.