Skip to content

Commit

Permalink
Do not reject consumers with no_ack=True #486 #530
Browse files Browse the repository at this point in the history
Thanks to @anjensan on his initial work on this issue.
  • Loading branch information
gmr committed Apr 29, 2015
1 parent 4528a1a commit ddba7ce
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
28 changes: 21 additions & 7 deletions pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, connection, channel_number, on_open_callback=None):
self._has_on_flow_callback = False
self._cancelled = collections.deque(list())
self._consumers = dict()
self._consumers_with_noack = set()
self._on_flowok_callback = None
self._on_getok_callback = None
self._on_openok_callback = on_open_callback
Expand Down Expand Up @@ -211,6 +212,9 @@ def basic_consume(self, consumer_callback,
if consumer_tag in self._consumers or consumer_tag in self._cancelled:
raise exceptions.DuplicateConsumerTag(consumer_tag)

if no_ack:
self._consumers_with_noack.add(consumer_tag)

self._consumers[consumer_tag] = consumer_callback
self._pending[consumer_tag] = list()
self._rpc(spec.Basic.Consume(queue=queue,
Expand Down Expand Up @@ -784,6 +788,20 @@ def _cleanup(self):
self._consumers = dict()
self.callbacks.cleanup(str(self.channel_number))

def _cleanup_consumer_ref(self, consumer_tag):
"""Remove any references to the consumer tag in internal structures
for consumer state.
:param str consumer_tag: The consumer tag to cleanup
"""
if consumer_tag in self._consumers_with_noack:
self._consumers_with_noack.remove(consumer_tag)
if consumer_tag in self._consumers:
del self._consumers[consumer_tag]
if consumer_tag in self._pending:
del self._pending[consumer_tag]

def _get_pending_msg(self, consumer_tag):
"""Get a pending message for the consumer tag from the stack.
Expand Down Expand Up @@ -833,8 +851,7 @@ def _on_cancel(self, method_frame):
"""
self._cancelled.append(method_frame.method.consumer_tag)
if method_frame.method.consumer_tag in self._consumers:
del self._consumers[method_frame.method.consumer_tag]
self._cleanup_consumer_ref(method_frame.method.consumer_tag)

def _on_cancelok(self, method_frame):
"""Called in response to a frame from the Broker when the
Expand All @@ -843,10 +860,7 @@ def _on_cancelok(self, method_frame):
:param pika.frame.Method method_frame: The method frame received
"""
if method_frame.method.consumer_tag in self._consumers:
del self._consumers[method_frame.method.consumer_tag]
if method_frame.method.consumer_tag in self._pending:
del self._pending[method_frame.method.consumer_tag]
self._cleanup_consumer_ref(method_frame.method.consumer_tag)

def _on_close(self, method_frame):
"""Handle the case where our channel has been closed for us
Expand Down Expand Up @@ -890,7 +904,7 @@ def _on_deliver(self, method_frame, header_frame, body):
"""
consumer_tag = method_frame.method.consumer_tag
if consumer_tag in self._cancelled:
if self.is_open:
if self.is_open and consumer_tag not in self._consumers_with_noack:
self.basic_reject(method_frame.method.delivery_tag)
return
if consumer_tag not in self._consumers:
Expand Down
30 changes: 30 additions & 0 deletions tests/acceptance/select_adapter_tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import uuid

import async_test_base

Expand Down Expand Up @@ -40,6 +41,35 @@ def start_test(self):
self.start()


class TestConsumeCancel(AsyncTestCase):
def begin(self, channel):
self.queue_name = str(uuid.uuid4())
channel.queue_declare(self.on_queue_declared,
queue=self.queue_name)

def on_queue_declared(self, frame):
for i in range(0, 100):
msg_body = '{}:{}:{}'.format(self.__class__.__name__,
i, time.time())
self.channel.basic_publish('', self.queue_name, msg_body)
self.ctag = self.channel.basic_consume(self.on_message,
queue=self.queue_name,
no_ack=True)

def on_message(self, _channel, _frame, _header, body):
self.channel.basic_cancel(self.on_cancel, self.ctag)

def on_cancel(self, _frame):
self.channel.queue_delete(self.on_deleted, self.queue_name)

def on_deleted(self, _frame):
self.stop()

def start_test(self):
"""SelectConnection should receive confirmation of Confirm.Select"""
self.start()


class TestExchangeDeclareAndDelete(AsyncTestCase):

X_TYPE = 'direct'
Expand Down

0 comments on commit ddba7ce

Please sign in to comment.