Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection be dropped when long running task in blocking connection #418

Closed
modeyang opened this issue Nov 5, 2013 · 8 comments
Closed

Comments

@modeyang
Copy link

modeyang commented Nov 5, 2013

We're trying to set up a topic queue system where a producer will generate several tasks and one or more consumers will grab a task at a time, process it, and acknowledge the message.

The problem is, some task will run over 10 minutes(in rabbitmq 3.*, Server side has heartbeat parameters, default 10 minutes .http://www.rabbitmq.com/configure.html), and we're not responding to messages at that time, causing the server to disconnect us.

Here's some pseudo code for our consumer:

!/usr/bin/env python

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
long_running_task(body)
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')

channel.start_consuming()

I refer the source of pika in the function of start_consuming, that seems only one thread process read/write event, so I suspect that it will block the heartbeat respond to mq sever when running a long task over 600 seconds.

I'm sorry if my mistake, any one confronted the same issue, and you how to solve it ? thanks a lot.

@joekarl
Copy link

joekarl commented Feb 26, 2014

I'm seeing the same thing, even when specifying heartbeat_interval in my connection params.

@vitaly-krugl
Copy link
Member

@modeyang, a pika connection runs entirely in the user's thread. So, if you don't call it's API periodically, you're not letting it deal with events, including heartbeats. Blocking connection has sleep() and process_data_events() methods that may be called periodically to keep the I/O going.

@modeyang
Copy link
Author

@vitaly-krugl can you give me example for this situation? using mutli-threads, one for real worker, another for events?

@vitaly-krugl
Copy link
Member

@modeyang, the trick here is acknowledging the message before the heartbeat expires, right? Have you tried configuring heartbeat for a much longer interval - long enough to process your longest task?

Also, a multi-threaded solution could work, as long as you're not accessing the connection/channel from multiple threads; here is an example of how it could work:

Consumer thread:

  1. Configure everything as you do now, but instead of start_consuming(), have your own loop that periodically calls connection.process_data_events(), while between calls waiting on an python thread-safe Queue.Queue (with a small timeout) for delivery-tags of completed tasks from your worker thread. When it gets deliver-tag, it ACKs it and continues with the process_data_events() loop as before until the next delivery-tag.
  2. The consumer callback passes the task to a thread (either starts a new thread for each task, or uses a python thread-safe Queue.Queue to pass tasks to a long-running thread.

Worker thread:

  1. gets the message from the consumer thread as described above
  2. processes the long-running task and place's the task message's delivery-tag (it's in the message's method frame) into the python thread-safe Queue.Queue that the consumer waits on, as described under "Consumer thread" above.

@vitaly-krugl
Copy link
Member

In addition to my earlier comment, it's now possible to set longer heartbeat timeouts via PR #666, which is now in Pika master (but not released yet). Please try out the version from Pika's master branch, including setting a longer heartbeat timeout, and open a new issue if you're still having trouble with it.

@GWSzeto
Copy link

GWSzeto commented Jan 24, 2018

I'm starting to get the same error even when I adjust the heartbeat connection parameter.
My tasks take usually around 5 mins, but even then, when I set the heartbeat to both 0 and some very large number (ie: 60000)

My code Is essentially identical to the one above and the error I receive is this:
Traceback (most recent call last): File "instagram_worker.py", line 43, in <module> channel.start_consuming() File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 1780, in start_consuming self.connection.process_data_events(time_limit=None) File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 716, in process_data_events self._dispatch_channel_events() File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 518, in _dispatch_channel_events impl_channel._get_cookie()._dispatch_events() File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 1403, in _dispatch_events evt.body) File "instagram_worker.py", line 37, in dataHandler channel.basic_ack(delivery_tag=method.delivery_tag) File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 1988, in basic_ack self._flush_output() File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 1250, in _flush_output *waiters) File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 474, in _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")

@vitaly-krugl
Copy link
Member

See comment #892 (comment)

@vitaly-krugl
Copy link
Member

add_callback_threadsafe in pull request #956 might help with this. See this example

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants