When ack is enabled and consumer starts after publisher has published all messages, the consumer receives messages from one queue only #410

Closed
susam opened this Issue Oct 26, 2013 · 4 comments

5 participants

@susam

I face this issue in the following two environments:

  1. pika 0.9.13, Python 2.7.3, RabbitMQ 3.1.5 and Windows 7 Enterprise Service Pack 1.
  2. pika 0.9.13, Python 2.6.6, RabbitMQ 3.0.1 and CentOS release 6.4.

Use the following two scripts to reproduce the issue.

send.py

import pika
import logging

logging.basicConfig(level=logging.DEBUG)

parameters = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Declare queues for the consumer
channel.queue_declare(queue='q1')
channel.queue_declare(queue='q2')

# Publish messages
for i in range(3):
    message = 'message ' + str(i)
    channel.basic_publish(exchange='',
                          routing_key='q1',
                          body='q1: ' + message)
    channel.basic_publish(exchange='',
                          routing_key='q2',
                          body='q2: ' + message)
    print 'Published', message, 'with routing keys q1 and q2'

connection.close()

recv.py

import pika
import logging

logging.basicConfig(level=logging.DEBUG)

def print_data(channel, method, properties, body):
    print 'body:', body
    channel.basic_ack(delivery_tag = method.delivery_tag)

parameters = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare(queue='q1')
channel.queue_declare(queue='q2')

channel.basic_consume(print_data, queue='q1')
channel.basic_consume(print_data, queue='q2')

channel.start_consuming()
connection.close()

Now execute send.py first.

C:\pika\one-queue-issue>python send.py 2> sendlog.txt
Published message 0 with routing keys q1 and q2
Published message 1 with routing keys q1 and q2
Published message 2 with routing keys q1 and q2

C:\pika\one-queue-issue>rabbitmqctl list_queues
Listing queues ...
q1      3
q2      3
...done.

Now execute recv.py. It begins waiting after processing messages from q1. Press Ctrl + C to kill it.

C:\pika\one-queue-issue>python recv.py 2> recvlog.txt
body: q1: message 0
body: q1: message 1
body: q1: message 2

C:\pika\one-queue-issue>rabbitmqctl list_queues
Listing queues ...
q1      0
q2      3
...done.

As you can see recv.py receives messages from q1 only. To receive messages from q2, we need to run
recv.py once more.

C:\pika\one-queue-issue>python recv.py 2> recvlog2.txt
body: q2: message 0
body: q2: message 1
body: q2: message 2

C:\pika\one-queue-issue>rabbitmqctl list_queues
Listing queues ...
q1      0
q2      0
...done.

See https://gist.github.com/susam/7173375 for the logs generated by the above scripts.

This issue does not occur if we first run the consumer program (recv.py), and then start the producer program (send.py). In this case, the consumer receives messages from both queues.

@susam

Some more information about this issue. This issue does not occur if the following statement is removed from recv.py provided in the previous comment.

    channel.basic_ack(delivery_tag = method.delivery_tag)

This shows that the usage of the channel.basic_ack is related to this issue. However, removing channel.basic_ack without setting no_ack=True for the channel.basic_consume calls would mean that if the consumer script quits and starts again, all messages would be redelivered to it again and again, as can be seen below.

C:\pika\one-queue-issue>python send.py 2> out.txt
Published message 0 with routing keys q1 and q2
Published message 1 with routing keys q1 and q2
Published message 2 with routing keys q1 and q2

C:\pika\one-queue-issue>python recv.py 2> out.txt
body: q1: message 0
body: q1: message 1
body: q1: message 2
body: q2: message 0
body: q2: message 1
body: q2: message 2

C:\pika\one-queue-issue>python recv.py 2> out.txt
body: q1: message 0
body: q1: message 1
body: q1: message 2
body: q2: message 0
body: q2: message 1
body: q2: message 2

Therefore, now we need to modify the channel.basic_consume calls to use no_ack=True.

channel.basic_consume(print_data, queue='q1', no_ack=True)
channel.basic_consume(print_data, queue='q2', no_ack=True)

Now, there is no re-delivery of messages and we are able to read from both queues.

C:\pika\one-queue-issue>python send.py 2> out.txt
Published message 0 with routing keys q1 and q2
Published message 1 with routing keys q1 and q2
Published message 2 with routing keys q1 and q2

C:\pika\one-queue-issue>python recv.py 2> out.txt
body: q1: message 0
body: q1: message 1
body: q1: message 2
body: q2: message 0
body: q2: message 1
body: q2: message 2

C:\pika\one-queue-issue>python recv.py 2> out.txt

However, we are losing the benefits of ack, i.e. the guarantee that any unacknowledged messages would be re-delivered.

@jonhol

I also have this issue (verified with 0.9.13 and 0.9.12, python 2.7.3 on ubuntu). I believe what happens is that consuming starts immediately when the first consumer is added (before start_consuming() is called), and then blocks waiting for more messages when the messages already on the queue is consumed.

Example producer:

#!/usr/bin/env python

import pika
import sys

def producer(number_of_messages):
    for _ in range(number_of_messages):
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='test')
        channel.basic_publish(exchange='',
                              routing_key='test',
                              body='Hello World!')
if __name__ == '__main__':
    if len(sys.argv) != 2:
        print ("Usage: %s <number of messages>" % sys.argv[1])
        exit(1)
    print ("Producer. Pika version: %s" % pika.__version__)
    number_of_messages = int(sys.argv[1])
    producer(number_of_messages)

Consumer:

#!/usr/bin/env python

import pika
import sys

def callback(channel, method, properties, body):
    print " [x] Received %r" % (body,)
    channel.basic_ack(delivery_tag = method.delivery_tag)

def consumer():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    channel.basic_consume(callback,
                          queue='test',
                          no_ack=False)
    print ("Not reached")

if __name__ == '__main__':
    print ("Consumer. Pika version: %s" % pika.__version__)
    consumer()

If we first run the producer and then the consumer we see that the line

    print ("Not reached")

is not reached:

<whatever>$ ./producer.py 5
Producer. Pika version: 0.9.13
<whatever>$ ./consumer.py 
Consumer. Pika version: 0.9.13
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
@maziyarpanahi

I started to see if I can listen to multiple queues and I notice the exact same situation. If there are messages in those queues only one will be used and the other not at all. But if the publisher starts after the receiver has been running it's all good.

Is there any new information about this?

Thanks

@vitaly-krugl
Pika member

Thank you. This is fixed in pika 0.10.0:

In [5]: %paste
import pika
import logging

logging.basicConfig(level=logging.DEBUG)

parameters = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Declare queues for the consumer
channel.queue_declare(queue='q1')
channel.queue_declare(queue='q2')

# Publish messages
for i in range(3):
    message = 'message ' + str(i)
    channel.basic_publish(exchange='',
                          routing_key='q1',
                          body='q1: ' + message)
    channel.basic_publish(exchange='',
                          routing_key='q2',
                          body='q2: ' + message)
    print 'Published', message, 'with routing keys q1 and q2'

connection.close()

## -- End pasted text --
Published message 0 with routing keys q1 and q2
Published message 1 with routing keys q1 and q2
Published message 2 with routing keys q1 and q2

In [6]: %paste
import pika
import logging

logging.basicConfig(level=logging.DEBUG)

def print_data(channel, method, properties, body):
    print 'body:', body
    channel.basic_ack(delivery_tag = method.delivery_tag)

parameters = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare(queue='q1')
channel.queue_declare(queue='q2')

channel.basic_consume(print_data, queue='q1')
channel.basic_consume(print_data, queue='q2')

channel.start_consuming()
connection.close()

## -- End pasted text --
body: q1: message 0
body: q1: message 1
body: q1: message 2
body: q2: message 0
body: q2: message 1
body: q2: message 2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment