-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
channel basic_ack recursion #286
Comments
This is interesting -- I've not looked at this in some time. I'll dig in and see if I can replicate. |
Can you try and install github master and see if it fixes the problem for you? I could see a possibility in the base connection where if there is always data to read, how one could get into a situation where a write would not be preferred. I've switched the order of events to write before reading, which should defer any frame dispatching until outbound frames have been written. |
I believe this is fixed, closing ticket due to lack of response. |
I am having this problem still with the latest master installed. I just have a basic consumer looking much like the rabbitmq tutorials and I have ~19 million items in my queue. Soon after I start my script, the following gets called recursively:
before I get a maximum recursion depth exceeded error. With the latest master, this happens quite soon after starting my script. In 0.9.12, the script would run longer - an hour or more before erroring. |
Huh. So the main issue with the recursion is that if you're always receiving frames and not writing them, you'll always pick up new frames in the handle read loop. I had thought that was fixed by forcing writes. I may have to rethink the core blocking connection ioloop to truly address this. |
I'm also running into this with version 0.9.13 on Ubuntu 13.04. I'm not sure if this is a red herring or not, but I have been able to reproduce this processing a queue with 30k messages; it usually fails after processing 659 [1]. Below is the pika-related logic in the program. Thanks! [1] The data below is attempting to process a queue containing ~30,000 messages. Each number here represents running a python script and failing after the given number of messages. The message count was calculated by running 668 659 659 659 659 659 659 659 659 677 659 573 659 659 659 659 659 582 659 659 676 659 659 676 659 659 659 573 659 659 659 671 676 668 659 659 659 659 659 4943 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="foo", durable=True, exclusive=False, auto_delete=False)
channel.queue_bind(exchange='amq.fanout', queue='foo')
# Method that will receive our messages and stop consuming after 10
def _on_message(channel, method, header, body):
body_split = body.split()
[...]
# Acknowledge message receipt
channel.basic_ack(method.delivery_tag)
# This is blocking until channel.stop_consuming is called and will allow us to receive messages
try:
# Setup up our consumer callback
channel.basic_consume(_on_message, queue="foo")
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming() |
I saw this issue again
|
I also saw this issue. Code to reproduce the problem(assume there are enough messages in queue 'hello'): import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='xxx'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def c(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(c,
queue='hello',
no_ack=False)
channel.start_consuming() I got the "maximum recursion depth exceeded" error after consuming about 660 messages with current master while I got the same error after consuming about 66250 messages with version 0.9.8 |
After this issue happen, I restart consumer script, but script won't consumer data from rabbitmq any more. |
@linbo Are the messages stuck as unacked? Have you tried catching the exit signal and closing the connection If you exit your script using Ctrl+C, you could catch KeyboardInterrupt and close the connection.
|
Yes, I close it. def close():
close_mq()
sys.exit(1)
def consume(callback, queue_name):
ensure_channel()
try:
channel.basic_consume(callback,
queue=queue_name)
channel.start_consuming()
except Exception, e:
rb_logger.exception("Consumer rabbmitmq erro %s" % str(e))
close() And I meet an strange issue, only 2 or 3 consumer script works fine, other consumer scrpit is running, but can't get data from rabbitmq |
Hi, I have a similar problem. I use pika 0.9.13 with RabbitMQ 3.1.3. My code : import pika
import sys
credentials = pika.PlainCredentials("test", "test")
parameters = pika.ConnectionParameters(
host = "localhost",
port = 5671,
virtual_host = "/test",
credentials = credentials,
ssl = True
)
connection = pika.BlockingConnection(parameters)
channel1 = connection.channel(1)
channel2 = connection.channel(2)
channel2.confirm_delivery()
def c(ch, method, properties, body):
channel2.basic_publish(
exchange = "amq.direct",
routing_key = "route.test",
body = body
)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel1.basic_consume(c,
queue='queue.test',
no_ack = False)
channel1.start_consuming()
except KeyboardInterrupt:
connection.close() Error 1 : RuntimeError: maximum recursion depth exceeded while calling a Python object
Error 2 : And when I don't have the last error. I get a other after some sent events:
RabbitMQ log :
There are a problem in my example ? |
Happening for me as well. I use pika 0.9.13 with RabbitMQ 3.1.5 on Ubuntu 12.04. Queue connected to a fanout exchange, receiving messages with delivery_mode = 2. Header frame: <BasicProperties(['delivery_mode=2', 'content_type=text/plain'])> Definitely a problem with: Happens to me when there are over ~3,000 packets in the queue and I start consuming then with channel.start_consuming(). For example, if I start the consumer when there are 5,000 messages in the queue, I may be left with ~4,000 messages that are unacked. The consumer will then keep consuming all the new packets coming in but old unacked packets will remain in the queue. If I then stop the consumer, unacked messages go back to ready state. If I start up the consumer now, I will end up with ~3,000 unacked messages in the queue. The numbers are rough, but this is what ends up happening. This did not happen with the old pika 0.9.5 which I was using for quite a while (that version had a different bug, but I digress). I don't have any additional info to submit than what previous commenters have submit already. |
+1 Experiencing this exact issue.
|
+1 happening to me as well. Queue has over 1 lakh objects. Pika version 0.9.13. |
Working on a prototype to fundamentally change the |
Yes, please sooner the better ! I am currently using a workaround in production environment :) |
I am unable to use channel.basic_ack due to #286 (this issue) and #410 (consumer receiving messages from one queue only). I was able to reproduce this issue in the following two environments:
Here is the producer and consumer code that reproduces the issue. send.py
recv.py
Running recv.py after send.py results in
See https://gist.github.com/susam/7173596 for the complete consumer logs. This issue does not occur if I do not use |
Here is a workaround that works for me. I have tested this workaround with 100,000 messages and 1,000,000 messages in the queue. The workaround is to set a small
Smaller Here is a complete code that shows my
|
Good find @susam ! I will try it out. Have you played with the prefetch_size param ? Any recommendations on that ? |
Just to say we’re running into the same problem with 319 messages_ready. On basic_nack’ing one, pika runs into an infinite recursion. Traceback: https://gist.github.com/moeffju/ae09de3f1d707bbf0f7c The prefetch_count workaround does not make a difference for us. |
Hmm this seems to be a nagging issue for long .. :( |
I am testing out a couple of changes on my own branch of pika that I think at least partly fixed the recursion issue. I renamed it pypika, but this branch is only really intended for personal use/testing. However, if you guys can help confirm that the patches works I'll come up with a proper solution and submit them to the pika repo. At least until gmr can finish his re-factoring of the BlockingConnection. |
@eandersson It would’ve been useful if you forked the Github repository so we could do a simple cross-repo comparison of your changes. Can you extract a patch vs. pika master or something? |
@moeffju Really though this would only be a work-around, and not sure if it works without the many changes I have done on my other branch. |
Let me know if you guys are still experiencing this issue after #440. |
I am working on a project using RabbitMQ and was testing how Rabbit behaves with 1 million+ messages in a queue. I let the producer fill up the queue in a course of a day or so. Today I tried de-queueing them all one by one with a Python consumer, using pika and BlockingConnection. The callback was nothing more than getting messages (a short JSON message, nothing fancy) and printing them to the console, then acking them. Using pika version 0.9.13 from PyPI I got the error described in this issue within a couple of seconds after running the consumer. Using master, everything works as expected! 😄 What is the release schedule for pika (a.k.a, when will the master be on PyPI)? |
I am glad the fix worked. Gmr is skipping 0.9.14 and going straight for 0.10.0. Last I heard it was about a month away. |
Awesome, eagerly awaiting for the release. |
Any update on when this fix might be released? |
@ejarendt Sounds like we might not see a formal release to PyPi until 0.10: In the meantime, you can install pika directly from this github repo to get the fix. |
I just pushed 0.9.14, though it did not address everything that was intended to be addressed. |
I had a similar case (recursion error when ack'ing messages with prefetch count) using the select_connection adapter and it seems that this patch |
the same problem, in version 0.9.13 |
Hi, I found a solution: Do not basic_consume data in the way illustrated by the officail site http://www.rabbitmq.com/tutorials/tutorial-two-python.html. Yes, I mean : do not ack in callbacker of basic_consume. When you call basic_ack in callbacker with blocking connection channel, it will lead to process read events (about 1 in ten times), which in turn calls your consume callbacker again, and then you call ack ack again ... this is the recursive cycle. |
@YEXINGZHE54 Have you also tried the basic_ack callback method with the patch from @vermoudakias? While a workaround is good, fixing the underlying issue is better. |
@moeffju Isn't that patch for the select_connection only? It looks to me like he is using 0.9.13 and the Blocking Connection. |
@eandersson @YEXINGZHE54 Whoops, sorry, I meant the fix from #440. |
The new BlockingConnection implementation in master is based on SelectConnection, and this recursion should no longer happen with the updated API/implementation. |
@gmr: let's close this issue. See my comment above. |
This was fixed in 0.10.0 |
Hi all,
Ubuntu 12.04, python 2.7.3, pika 0.9.9
I'm experiencing the following exception raised when consuming messages from the queue (using BlockingConnection):
File "....", line 146, in _callback
ch.basic_ack(delivery_tag = method.delivery_tag)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 138, in basic_ack
return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 972, in _send_method
self.connection.send_method(self.channel_number, method_frame, content)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 161, in send_method
self._send_method(channel_number, method_frame, content)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1339, in _send_method
self._send_frame(frame.Method(channel_number, method_frame))
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 298, in _send_frame
self.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 129, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 232, in _handle_read
super(BlockingConnection, self)._handle_read()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 307, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1145, in _on_data_available
self._process_frame(frame_value)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1200, in _process_frame
self._deliver_frame_to_channel(frame_value)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 848, in _deliver_frame_to_channel
return self._channels[value.channel_number]._handle_content_frame(value)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 775, in _handle_content_frame
self._on_deliver(_response)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 852, in _on_deliver
body)
File "....", line 146, in _callback
ch.basic_ack(delivery_tag = method.delivery_tag)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 138, in basic_ack
return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 972, in _send_method
self.connection.send_method(self.channel_number, method_frame, content)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 161, in send_method
self._send_method(channel_number, method_frame, content)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1339, in _send_method
self._send_frame(frame.Method(channel_number, method_frame))
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 298, in _send_frame
self.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 129, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 232, in _handle_read
super(BlockingConnection, self)._handle_read()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 307, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1145, in _on_data_available
self._process_frame(frame_value)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1200, in _process_frame
self._deliver_frame_to_channel(frame_value)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 848, in _deliver_frame_to_channel
return self._channels[value.channel_number]._handle_content_frame(value)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 775, in _handle_content_frame
self._on_deliver(_response)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 852, in _on_deliver
body)
File "....", line 146, in _callback
ch.basic_ack(delivery_tag = method.delivery_tag)
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 138, in basic_ack
and this multiple times ending with "RuntimeError: maximum recursion depth exceeded"
The queue is loaded with large amount of small messages (9Bytes each).
The issue reproduces randomly, but the more messages waiting to be processed the more easy reproducible the issue is.
I've found just a single mention of a related issue here:
http://groups.google.com/group/rabbitmq-discuss/browse_frm/month/2011-5?start=250&sa=N&fwc=1
"There is an interesting recursion bug under heavy loads in all 0.9.x versions. This bug would only present itself if there was always a frame to read or frame to publish in the buffer. If the IOLoop ever had a cycle without inbound or outbound data, the bug will not present. But if there is always data to process, for up to 1,000 frames, there will be a RuntimeError. 1,000 is the default Python recursion limit. This bug has been fixed."
Is this really fixed and is it a known issue or is it being worked towards fixing it?
Thanks,
Krasi
The text was updated successfully, but these errors were encountered: