Permalink
Browse files

#9 Support cancel notifications.

This is likely to break some applications - basically, if you're consuming from a queue that is deleted, you will receive 'basic.cancel' instead of a valid message. This is a good solution though - it's better to crash user app (that is not expecting this behaviour) than to allow it to hang indefinetely.
  • Loading branch information...
1 parent d5e1312 commit 38c97a537e8113275b923cc08971d7216e7e220b @majek committed Oct 7, 2011
Showing with 160 additions and 3 deletions.
  1. +2 −1 amqp-accepted-by-update.json
  2. +11 −2 puka/machine.py
  3. +147 −0 tests/test_cancel.py
@@ -144,7 +144,8 @@
"server"
],
"cancel": [
- "server"
+ "server",
+ "client"
],
"recover-async": [
"server"
View
@@ -28,7 +28,7 @@ def _connection_start(t, result):
scapa = result['server_properties'].get('capabilities', {})
ccapa = {}
if scapa.get('consumer_cancel_notify'):
- ccapa['consumer_cancel_notify'] = False
+ ccapa['consumer_cancel_notify'] = True
frames = spec.encode_connection_start_ok({'product': 'Puka',
'capabilities': ccapa},
'PLAIN', response, 'en_US')
@@ -248,6 +248,7 @@ def basic_consume_multi(conn, queues, prefetch_count=0, no_ack=False):
t.x_no_ack = no_ack
t.x_consumer_tag = {}
t.register(spec.METHOD_BASIC_DELIVER, _bcm_basic_deliver)
+ t.register(spec.METHOD_BASIC_CANCEL, _bcm_basic_cancel)
return t
def _bcm_basic_qos(t):
@@ -274,6 +275,10 @@ def _bcm_basic_deliver(t, msg_result):
t.refcnt_inc()
t.ping(msg_result)
+def _bcm_basic_cancel(ct, result):
+ ct.register(spec.METHOD_BASIC_CANCEL, _generic_callback_nop)
+ ct.x_ct = ct
+ _basic_cancel(ct)
##
def basic_ack(conn, msg_result):
@@ -331,8 +336,9 @@ def _basic_cancel_ok(ct, result):
_basic_cancel_one(ct)
else:
ct.x_mt.done(result)
+ if ct != ct.x_mt:
+ ct.done(None, no_callback=True)
ct.x_mt = None
- ct.done(None, no_callback=True)
ct.refcnt_clear()
####
@@ -383,6 +389,9 @@ def _generic_callback(t):
def _generic_callback_ok(t, result):
t.done(result)
+def _generic_callback_nop(t, result):
+ pass
+
####
def exchange_delete(conn, exchange, if_unused=False):
t = conn.promises.new(_generic_callback)
View
@@ -0,0 +1,147 @@
+from __future__ import with_statement
+
+import os
+import puka
+
+import base
+
+
+class TestCancel(base.TestCase):
+ @base.connect
+ def test_cancel_single(self, client):
+ promise = client.queue_declare(queue=self.name)
+ client.wait(promise)
+
+ promise = client.basic_publish(exchange='', routing_key=self.name,
+ body='a')
+ client.wait(promise)
+
+ consume_promise = client.basic_consume(queue=self.name, prefetch_count=1)
+ result = client.wait(consume_promise)
+ self.assertEqual(result['body'], 'a')
+
+ promise = client.basic_cancel(consume_promise)
+ result = client.wait(promise)
+ self.assertTrue('consumer_tag' in result)
+
+ # TODO: better error
+ # It's illegal to wait on consume_promise after cancel.
+ #with self.assertRaises(Exception):
+ # client.wait(consume_promise)
+
+ promise = client.queue_delete(queue=self.name)
+ client.wait(promise)
+
+
+ @base.connect
+ def test_cancel_multi(self, client):
+ names = [self.name, self.name1, self.name2]
+ for name in names:
+ promise = client.queue_declare(queue=name)
+ client.wait(promise)
+ promise = client.basic_publish(exchange='', routing_key=name,
+ body='a')
+ client.wait(promise)
+
+
+ consume_promise = client.basic_consume_multi(queues=names,
+ no_ack=True)
+ for i in range(len(names)):
+ result = client.wait(consume_promise)
+ self.assertEqual(result['body'], 'a')
+
+ promise = client.basic_cancel(consume_promise)
+ result = client.wait(promise)
+ self.assertTrue('consumer_tag' in result)
+
+ # TODO: better error
+ #with self.assertRaises(Exception):
+ # client.wait(consume_promise)
+
+ promise = client.queue_delete(queue=self.name)
+ client.wait(promise)
+
+ @base.connect
+ def test_cancel_single_notification(self, client):
+ promise = client.queue_declare(queue=self.name)
+ client.wait(promise)
+
+ promise = client.basic_publish(exchange='', routing_key=self.name,
+ body='a')
+ client.wait(promise)
+
+ consume_promise = client.basic_consume(queue=self.name, prefetch_count=1)
+ result = client.wait(consume_promise)
+ self.assertEqual(result['body'], 'a')
+
+ promise = client.queue_delete(self.name)
+
+ result = client.wait(consume_promise)
+ self.assertEqual(result.name, 'basic.cancel_ok')
+
+ # Make sure the consumer died:
+ promise = client.queue_declare(queue=self.name)
+ result = client.wait(promise)
+ self.assertEqual(result['consumer_count'], 0)
+
+
+ @base.connect
+ def test_cancel_multi_notification(self, client):
+ names = [self.name, self.name1, self.name2]
+ for name in names:
+ promise = client.queue_declare(queue=name)
+ client.wait(promise)
+ promise = client.basic_publish(exchange='', routing_key=name,
+ body='a')
+ client.wait(promise)
+
+ consume_promise = client.basic_consume_multi(queues=names,
+ no_ack=True)
+ for i in range(len(names)):
+ result = client.wait(consume_promise)
+ self.assertEqual(result['body'], 'a')
+
+ promise = client.queue_delete(names[0])
+
+ result = client.wait(consume_promise)
+ self.assertEqual(result.name, 'basic.cancel_ok')
+
+ # Make sure the consumer died:
+ for name in names:
+ promise = client.queue_declare(queue=name)
+ result = client.wait(promise)
+ self.assertEqual(result['consumer_count'], 0)
+
+ @base.connect
+ def test_cancel_multi_notification_concurrent(self, client):
+ names = [self.name, self.name1, self.name2]
+ for name in names:
+ promise = client.queue_declare(queue=name)
+ client.wait(promise)
+ promise = client.basic_publish(exchange='', routing_key=name,
+ body='a')
+ client.wait(promise)
+
+ consume_promise = client.basic_consume_multi(queues=names,
+ no_ack=True)
+ for i in range(len(names)):
+ result = client.wait(consume_promise)
+ self.assertEqual(result['body'], 'a')
+
+ client.queue_delete(names[0])
+ client.queue_delete(names[2])
+
+ result = client.wait(consume_promise)
+ self.assertEqual(result.name, 'basic.cancel_ok')
+
+ # Make sure the consumer died:
+ for name in names:
+ promise = client.queue_declare(queue=name)
+ result = client.wait(promise)
+ self.assertEqual(result['consumer_count'], 0)
+
+
+
+if __name__ == '__main__':
+ import tests
+ tests.run_unittests(globals())

0 comments on commit 38c97a5

Please sign in to comment.