Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

#38 - allow user to specify part of consumer_tag value

  • Loading branch information...
commit 5243b28baa4752ac5dc8ad56baeb76887420d63e 1 parent 93fe894
@majek authored
Showing with 44 additions and 9 deletions.
  1. +2 −1  puka/machine.py
  2. +42 −8 tests/test_consume.py
View
3  puka/machine.py
@@ -246,12 +246,13 @@ def basic_consume_multi(conn, queues, prefetch_count=0, no_ack=False):
queue = item
no_local = exclusive = False
arguments = {}
+ consumer_tag = '%s.%s.%s' % (t.number, i, '')
else:
queue = item['queue']
no_local = item.get('no_local', False)
exclusive = item.get('exclusive', False)
arguments = item.get('arguments', {})
- consumer_tag = '%s.%s' % (t.number, i)
+ consumer_tag = '%s.%s.%s' % (t.number, i, item.get('consumer_tag', ''))
t.x_consumes.append( (queue, spec.encode_basic_consume(
queue, consumer_tag, no_local, no_ack, exclusive, arguments)) )
t.x_no_ack = no_ack
View
50 tests/test_consume.py
@@ -97,32 +97,33 @@ def test_consumer_tag(self, client):
consume_promise = client.basic_consume(queue=self.name1)
result = client.wait(consume_promise)
self.assertEqual(result['body'], self.msg)
- self.assertEqual(result['consumer_tag'], '%s.0' % consume_promise)
+ self.assertEqual(result['consumer_tag'], '%s.0.' % consume_promise)
client.basic_ack(result)
promise = client.basic_cancel(consume_promise)
result = client.wait(promise)
# Consume multi
p1 = client.basic_publish(exchange='', routing_key=self.name1,
- body=self.msg1)
+ body=self.msg1)
p2 = client.basic_publish(exchange='', routing_key=self.name2,
- body=self.msg2)
+ body=self.msg2)
client.wait_for_all([p1, p2])
- consume_promise = client.basic_consume_multi(
- [{'queue': self.name1}, self.name2],
- prefetch_count=1)
+ consume_promise = client.basic_consume_multi([
+ self.name1,
+ {'queue': self.name2,
+ 'consumer_tag': 'whooa!'}])
for _ in range(2):
result = client.wait(consume_promise)
if result['body'] == self.msg1:
self.assertEqual(result['body'], self.msg1)
self.assertEqual(result['consumer_tag'],
- '%s.0' % consume_promise)
+ '%s.0.' % consume_promise)
else:
self.assertEqual(result['body'], self.msg2)
self.assertEqual(result['consumer_tag'],
- '%s.1' % consume_promise)
+ '%s.1.whooa!' % consume_promise)
client.basic_ack(result)
p1 = client.queue_delete(queue=self.name1)
@@ -130,6 +131,39 @@ def test_consumer_tag(self, client):
client.wait_for_all([p1, p2])
+ @base.connect
+ def test_consumer_tag_repeated(self, client):
+ # In theory consumer_tags are unique. But our users may not
+ # know about it. Test puka's behaviour in that case
+
+ p1 = client.queue_declare(queue=self.name1)
+ p2 = client.queue_declare(queue=self.name2)
+ client.wait_for_all([p1, p2])
+
+ promise = client.basic_publish(exchange='', routing_key=self.name1,
+ body=self.msg)
+ client.wait(promise)
+
+ consume_promise = client.basic_consume_multi([
+ {'queue': self.name1,
+ 'consumer_tag': 'repeated'},
+ {'queue': self.name1,
+ 'consumer_tag': 'repeated'},
+ {'queue': self.name2,
+ 'consumer_tag': 'repeated'}])
+
+ result = client.wait(consume_promise)
+ self.assertEqual(result['body'], self.msg)
+ ct = result['consumer_tag'].split('.')
+ self.assertEqual(ct[0], '%s' % consume_promise)
+ self.assertTrue(ct[1] in ('0', '1', '2'))
+ self.assertEqual(ct[2], 'repeated')
+
+ p1 = client.queue_delete(queue=self.name1)
+ p2 = client.queue_delete(queue=self.name2)
+ client.wait_for_all([p1, p2])
+
+
if __name__ == '__main__':
import tests
tests.run_unittests(globals())

1 comment on commit 5243b28

@reedwade

sweet!

Br3nda is away at a conference today. She may spot something we've forgotten.

Please sign in to comment.
Something went wrong with that request. Please try again.