Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingChannel can't stop_consuming #770

Closed
antoineleclair opened this issue Sep 16, 2016 · 11 comments
Closed

BlockingChannel can't stop_consuming #770

antoineleclair opened this issue Sep 16, 2016 · 11 comments

Comments

@antoineleclair
Copy link

antoineleclair commented Sep 16, 2016

When using a BlockingChannel to consume many queues with start_consuming, there's no way to stop_consuming cleanly, unless a message is received.

import signal
import pika

params = pika.URLParameters('your params')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='queue_one', durable=True)
channel.queue_declare(queue='queue_two', durable=True)
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    # process message here
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue=queue)

def sigterm_received(signum, frame):
    channel.stop_consuming()
signal.signal(signal.SIGTERM, sigterm_received)

channel.start_consuming()

print('Never reaches here unless a message is dequeued')

channel.close()
connection.close()

If a message is received, then the message "Never reaches hhere unless a message is dequeued" is printed. My best guess is that this line is causing the problem:

while self._consumer_infos:
    self.connection.process_data_events(time_limit=None)

self.connection.process_data_events(time_limit=None)

In the example above, replacing the line

channel.start_consuming()

with

while channel._consumer_infos:
    channel.connection.process_data_events(time_limit=1)

seems to fix the issue. However, I'm a bit cautious that this fix will cause other issues due to the fact that the BlockingConnection is not thread safe.

Also, just to clarify what I'm trying to do here: I want to consume many queues and shutdown gracefully when a SIGTERM signal is received. I want the messages that are currently processing to complete (and ACK) before closing everything. And I also want to exit quick (1 second is acceptable) if there are no messages currently being processed.

@vitaly-krugl
Copy link
Member

@antoineleclair, I would never call something from a signal handler that's not explicitly documented as "signal-safe". That's asking for trouble. Also, using the private instance variable _consumer_infos is problematic, because it may change in the future.

The following may work for you, using documented public API:

  1. In sigterm_received, set a global variable to True (e.g., g_stop_requested = True) or something that's equivalent and signal-safe.
  2. Use BlockingConnection.add_timeout to request a 1-second timer callback
  3. In the callback, check if your g_stop_requested (or equivalent) is set, and call BlockingChannel.stop_consuming to cancel all consumers, if so. If not, request a new timer callback.

I am generally not crazy about polling, but BlockingConnection implements the synchronous command metaphor, so there is not much else you can do, I think. Also, calling channel.connection.process_data_events(time_limit=1) is also polling and using a timer under the covers.

Hope this helps!

@vitaly-krugl
Copy link
Member

Please close the issue if my reply resolved the problem.

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Sep 17, 2016

@antoineleclair - And another thing, if there is the possibility that your RabbitMQ server may run low on resources (low disk space, low RAM), then RabbitMQ will go into blocking mode (won't be reading from your connection), which may block your basic_ack or basic_publish or a synchronous request indefinitely until resources free up. In that case, the timer callbacks won't be processed until RabbitMQ becomes unblocked.

For that, there is a new feature in master branch (not released yet), that allows you to set a blocked connection timeout. Find out more here:

Blocked Connection deadlock avoidance: when RabbitMQ becomes low on
resources, it emits Connection.Blocked (AMQP extension) to the client
connection when client makes a resource-consuming request on that connection
or its channel (e.g., `Basic.Publish`); subsequently, RabbitMQ suspsends
processing requests from that connection until the affected resources are
restored. See http://www.rabbitmq.com/connection-blocked.html. This
may impact `BlockingConnection` and `BlockingChannel` operations in a
way that users might not be expecting. For example, if the user dispatches
`BlockingChannel.basic_publish` in non-publisher-confirmation mode while
RabbitMQ is in this low-resource state followed by a synchronous request
(e.g., `BlockingConnection.channel`, `BlockingChannel.consume`,
`BlockingChannel.basic_consume`, etc.), the synchronous request will block
indefinitely (until Connection.Unblocked) waiting for RabbitMQ to reply. If
the blocked state persists for a long time, the blocking operation will
appear to hang. In this state, `BlockingConnection` instance and its
channels will not dispatch user callbacks. SOLUTION: To break this potential
deadlock, applications may configure the `blocked_connection_timeout`
connection parameter when instantiating `BlockingConnection`. Upon blocked
connection timeout, this adapter will raise ConnectionClosed exception with
first exception arg of
`pika.connection.InternalCloseReasons.BLOCKED_CONNECTION_TIMEOUT`. See
`pika.connection.ConnectionParameters` documentation to learn more about
`blocked_connection_timeout` configuration.

@antoineleclair
Copy link
Author

Thanks @vitaly-krugl, that did it. Much cleaner with BlockingConnection.add_timeout. Thanks a lot!

Also, if my opinion can be useful, I would have expected, from an API design perspective, BlockingChannel.stop_consuming to be signal-safe and to work even when no messages are being received.

@vitaly-krugl
Copy link
Member

@antoineleclair, thank you for your update and feedback.

A pika connection is not thread-safe. Same concerning signal-safety. Supporting those paradigms typically introduces significant complexity and performance issues and is outside the scope of this library.

@guysoft
Copy link

guysoft commented Apr 9, 2018

In what scope does the callback in BlockingConnection.add_timeout run? There is something I saw it mentioned in the documentation but I see no example.

NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming.

I have the BlockingConnection.add_timeout running as part of a class, and I want to access attributes of that class (something like RpcServer.is_running() ? ). But it seems like the variables do not update in this scope.

@lukebakken
Copy link
Member

lukebakken commented Apr 9, 2018

@guysoft - since this issue is closed and the last response from 2016, please open a new issue with a description of what you are trying to accomplish and what you observe. Also, provide a runnable code sample that demonstrates it. That will be the most help to Pika's maintainers, who are all volunteers. Thank you!

@guysoft
Copy link

guysoft commented Apr 24, 2018

Ok, I fixed my issue (using basic_consume and handling it myself). But basically the issue was this:
When running the add_timeout callback you can't interact with part of the python variables, even though its same thread.
I can't manage to rip example code, I might have time to solve this later on. Thanks for the reply though and well done!

@wanguanfu
Copy link

I throw no clear way to solve (pika-stop-consuming-does-not-work)

@wanguanfu
Copy link

like this???

   In the example above, replacing the line
             channel.start_consuming()

with
while channel._consumer_infos:
channel.connection.process_data_events(time_limit=1)

@lukebakken
Copy link
Member

@wanguanfu - please provide code that reproduces your issue and attach it as a new message to the pika-python mailing list.

@pika pika locked as resolved and limited conversation to collaborators Jul 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants