New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket Error: 104 consuming messages with task that take a long time #753

Closed
pat1 opened this Issue May 27, 2016 · 23 comments

Comments

Projects
None yet
9 participants
@pat1

pat1 commented May 27, 2016

As in #418 I have problem consuming messages with task that take a long time.
I am using git master setting heartbeat_interval=0.

My code is very similar to
http://pika.readthedocs.io/en/0.10.0/examples/asynchronous_consumer_example.html
but my on_message method take some minutes to "consume" the message.

after some time consuming message I get:

INFO 2016-05-27 16:28:51,083 __main__ connect  101 : Connecting to localhost
INFO 2016-05-27 16:28:51,084 pika.adapters.base_connection _create_and_connect_to_socket  216 : Connecting to ::1:5672
INFO 2016-05-27 16:28:51,089 __main__ on_connection_open  117 : Connection opened
INFO 2016-05-27 16:28:51,089 __main__ add_on_connection_close_callback  126 : Adding connection close callback
INFO 2016-05-27 16:28:51,089 __main__ open_channel  169 : Creating a new channel
INFO 2016-05-27 16:28:51,092 __main__ on_channel_open  181 : Channel opened
INFO 2016-05-27 16:28:51,092 __main__ add_on_channel_close_callback  193 : Adding channel close callback
INFO 2016-05-27 16:28:51,092 __main__ start_consuming  223 : Issuing consumer related RPC commands
INFO 2016-05-27 16:28:51,092 __main__ add_on_cancel_callback  233 : Adding consumer cancellation callback
INFO 2016-05-27 16:28:51,102 __main__ on_message  263 : Received message # 1 from None
INFO 2016-05-27 16:43:45,104 __main__ run  72  : Done
INFO 2016-05-27 16:43:45,883 __main__ acknowledge_message  312 : Acknowledging message 1
ERROR 2016-05-27 16:43:45,884 pika.adapters.base_connection _handle_error  362 : Socket Error: 104
INFO 2016-05-27 16:43:45,884 pika.connection _on_terminate  1891: Disconnected from RabbitMQ at localhost:5672 (-1): error(104, 'Connection reset by peer')
WARNING 2016-05-27 16:43:45,885 __main__ on_channel_closed  209 : Channel 1 was closed: (-1) error(104, 'Connection reset by peer')
WARNING 2016-05-27 16:43:45,886 pika.connection close  1135: Suppressing close request on <SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
WARNING 2016-05-27 16:43:45,887 __main__ on_connection_closed  144 : Connection closed, reopening in 5 seconds: (-1) error(104, 'Connection reset by peer')
INFO 2016-05-27 16:43:50,892 __main__ connect  101 : Connecting to localhost

On the log you can see function and line before meaage.
I have tried to execute my task in thread maintaining the amqp comunication in the main process with:

self._connection.ioloop.stop()
while (thread.isAlive()):
            #self._connection.ioloop.poll(write_only=True)
            #self._connection.ioloop.poll()                                                                                               
            time.sleep(3)

write_only=True was available in previous version
self._connection.ioloop.poll() create a recursion call with a lot of thread and problems
self._connection.ioloop.start()

So I don't have solution, I cannot consume messages and all is stalled.
Is possible we need a (background) poll to maintain socket and do not get new messages from queue?

@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl May 27, 2016

Member

@pat1, if the processing takes so long, then it's not a good match for the asynchronous programming model. Asynchronous programming model works well for I/O-bound processing that uses a common ioloop for all blocking operations within a thread.

I am also surprised that with heartbeat_interval=0 you would get heartbeat timeouts. That should completely disable heartbeats. Please check the rabbitmq log to see if the connection is dropped due to heartbeat timeout or something else.

Your app is not accessing the same pika connection (and/or its channels) from more than one thread, is it? pika connections are not thread-safe, so you would expect to have trouble in that case.

The following would be really helpful for debugging:

  1. your environment (os, os version, etc.)
  2. rabbitmq version
  3. A small, simple script that easily reproduces the lost connection. You may use time.sleep instead of your long-running processing operation.
Member

vitaly-krugl commented May 27, 2016

@pat1, if the processing takes so long, then it's not a good match for the asynchronous programming model. Asynchronous programming model works well for I/O-bound processing that uses a common ioloop for all blocking operations within a thread.

I am also surprised that with heartbeat_interval=0 you would get heartbeat timeouts. That should completely disable heartbeats. Please check the rabbitmq log to see if the connection is dropped due to heartbeat timeout or something else.

Your app is not accessing the same pika connection (and/or its channels) from more than one thread, is it? pika connections are not thread-safe, so you would expect to have trouble in that case.

The following would be really helpful for debugging:

  1. your environment (os, os version, etc.)
  2. rabbitmq version
  3. A small, simple script that easily reproduces the lost connection. You may use time.sleep instead of your long-running processing operation.
@pat1

This comment has been minimized.

Show comment
Hide comment
@pat1

pat1 May 27, 2016

Thanks for your replay.
my os is Linux (Fedora 20 386)
rpm -q python python-2.7.5-16.fc20.i686
pika git master
rpm -q rabbitmq-server rabbitmq-server-3.6.2-1.noarch

My app is not accessing the same pika connection (and/or its channels) from more than one thread.

test program:
https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e
The log:

 [*] Waiting for messages. To exit press CTRL+C
INFO 2016-05-27 18:53:54,438 __main__ connect  56  : Connecting to localhost
INFO 2016-05-27 18:53:54,440 pika.adapters.base_connection _create_and_connect_to_socket  216 : Connecting to ::1:5672
INFO 2016-05-27 18:53:54,445 __main__ on_connection_open  72  : Connection opened
INFO 2016-05-27 18:53:54,445 __main__ add_on_connection_close_callback  81  : Adding connection close callback
INFO 2016-05-27 18:53:54,446 __main__ open_channel  124 : Creating a new channel
INFO 2016-05-27 18:53:54,448 __main__ on_channel_open  136 : Channel opened
INFO 2016-05-27 18:53:54,448 __main__ add_on_channel_close_callback  148 : Adding channel close callback
INFO 2016-05-27 18:53:54,448 __main__ start_consuming  178 : Issuing consumer related RPC commands
INFO 2016-05-27 18:53:54,448 __main__ add_on_cancel_callback  188 : Adding consumer cancellation callback
INFO 2016-05-27 18:53:54,463 __main__ on_message  218 : Received message # 1 from None
INFO 2016-05-27 19:03:54,556 __main__ acknowledge_message  243 : Acknowledging message 1
ERROR 2016-05-27 19:03:54,557 pika.adapters.base_connection _handle_error  362 : Socket Error: 104
INFO 2016-05-27 19:03:54,558 pika.connection _on_terminate  1891: Disconnected from RabbitMQ at localhost:5672 (-1): error(104, 'Connection reset by peer')
WARNING 2016-05-27 19:03:54,559 __main__ on_channel_closed  164 : Channel 1 was closed: (-1) error(104, 'Connection reset by peer')
WARNING 2016-05-27 19:03:54,559 pika.connection close  1135: Suppressing close request on <SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
WARNING 2016-05-27 19:03:54,560 __main__ on_connection_closed  99  : Connection closed, reopening in 5 seconds: (-1) error(104, 'Connection reset by peer')
INFO 2016-05-27 19:03:59,563 __main__ connect  56  : Connecting to localhost

pat1 commented May 27, 2016

Thanks for your replay.
my os is Linux (Fedora 20 386)
rpm -q python python-2.7.5-16.fc20.i686
pika git master
rpm -q rabbitmq-server rabbitmq-server-3.6.2-1.noarch

My app is not accessing the same pika connection (and/or its channels) from more than one thread.

test program:
https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e
The log:

 [*] Waiting for messages. To exit press CTRL+C
INFO 2016-05-27 18:53:54,438 __main__ connect  56  : Connecting to localhost
INFO 2016-05-27 18:53:54,440 pika.adapters.base_connection _create_and_connect_to_socket  216 : Connecting to ::1:5672
INFO 2016-05-27 18:53:54,445 __main__ on_connection_open  72  : Connection opened
INFO 2016-05-27 18:53:54,445 __main__ add_on_connection_close_callback  81  : Adding connection close callback
INFO 2016-05-27 18:53:54,446 __main__ open_channel  124 : Creating a new channel
INFO 2016-05-27 18:53:54,448 __main__ on_channel_open  136 : Channel opened
INFO 2016-05-27 18:53:54,448 __main__ add_on_channel_close_callback  148 : Adding channel close callback
INFO 2016-05-27 18:53:54,448 __main__ start_consuming  178 : Issuing consumer related RPC commands
INFO 2016-05-27 18:53:54,448 __main__ add_on_cancel_callback  188 : Adding consumer cancellation callback
INFO 2016-05-27 18:53:54,463 __main__ on_message  218 : Received message # 1 from None
INFO 2016-05-27 19:03:54,556 __main__ acknowledge_message  243 : Acknowledging message 1
ERROR 2016-05-27 19:03:54,557 pika.adapters.base_connection _handle_error  362 : Socket Error: 104
INFO 2016-05-27 19:03:54,558 pika.connection _on_terminate  1891: Disconnected from RabbitMQ at localhost:5672 (-1): error(104, 'Connection reset by peer')
WARNING 2016-05-27 19:03:54,559 __main__ on_channel_closed  164 : Channel 1 was closed: (-1) error(104, 'Connection reset by peer')
WARNING 2016-05-27 19:03:54,559 pika.connection close  1135: Suppressing close request on <SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
WARNING 2016-05-27 19:03:54,560 __main__ on_connection_closed  99  : Connection closed, reopening in 5 seconds: (-1) error(104, 'Connection reset by peer')
INFO 2016-05-27 19:03:59,563 __main__ connect  56  : Connecting to localhost

@pat1

This comment has been minimized.

Show comment
Hide comment
@pat1

pat1 May 31, 2016

Adding socket_timeout=1200 in ConnectionParameters do not solve the problem.
On the server:

sysctl -a |grep net.ipv4.tcp_keepalive
net.ipv4.tcp_keepalive_intvl = 75
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_time = 7200

pat1 commented May 31, 2016

Adding socket_timeout=1200 in ConnectionParameters do not solve the problem.
On the server:

sysctl -a |grep net.ipv4.tcp_keepalive
net.ipv4.tcp_keepalive_intvl = 75
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_time = 7200
@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl May 31, 2016

Member

What does the network look like? Is there a proxy or similar between the client and rabbitmq broker?

Member

vitaly-krugl commented May 31, 2016

What does the network look like? Is there a proxy or similar between the client and rabbitmq broker?

@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl May 31, 2016

Member

Also, what does the RabbitMQ log have to say about this? Please post the RabbitMQ log that shows the disconnect.

Member

vitaly-krugl commented May 31, 2016

Also, what does the RabbitMQ log have to say about this? Please post the RabbitMQ log that shows the disconnect.

@pat1

This comment has been minimized.

Show comment
Hide comment
@pat1

pat1 May 31, 2016

I have no proxy, I access rabbitmq via localhost (or public IP, do not change the result).
The rabbitmq log looks like:

=INFO REPORT==== 31-May-2016::19:41:30 ===
accepting AMQP connection <0.32634.2> ([::1]:50677 -> [::1]:5672)

=ERROR REPORT==== 31-May-2016::19:42:00 ===
closing AMQP connection <0.32634.2> ([::1]:50677 -> [::1]:5672):
{writer,send_failed,{error,timeout}}

Setting {heartbeat, 0} in /etc/rabbitmq/rabbitmq.config do not change the behavior.

pat1 commented May 31, 2016

I have no proxy, I access rabbitmq via localhost (or public IP, do not change the result).
The rabbitmq log looks like:

=INFO REPORT==== 31-May-2016::19:41:30 ===
accepting AMQP connection <0.32634.2> ([::1]:50677 -> [::1]:5672)

=ERROR REPORT==== 31-May-2016::19:42:00 ===
closing AMQP connection <0.32634.2> ([::1]:50677 -> [::1]:5672):
{writer,send_failed,{error,timeout}}

Setting {heartbeat, 0} in /etc/rabbitmq/rabbitmq.config do not change the behavior.

@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl May 31, 2016

Member

@pat1, the rabbitmq log doesn't look like the failure is related to heartbeat. Note that the error is reported as "send failed ... timeout" just 30 seconds after "accepted AMQP connection". A send timeout would usually mean that transmit retries were exhausted and the peer is unreachable.

This is what a heartbeat timeout log entry looks like:

=INFO REPORT==== 31-May-2016::13:30:38 ===
accepting AMQP connection <0.1559.0> (127.0.0.1:63742 -> 127.0.0.1:5672)

=ERROR REPORT==== 31-May-2016::13:33:38 ===
closing AMQP connection <0.1559.0> (127.0.0.1:63742 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 60s
Member

vitaly-krugl commented May 31, 2016

@pat1, the rabbitmq log doesn't look like the failure is related to heartbeat. Note that the error is reported as "send failed ... timeout" just 30 seconds after "accepted AMQP connection". A send timeout would usually mean that transmit retries were exhausted and the peer is unreachable.

This is what a heartbeat timeout log entry looks like:

=INFO REPORT==== 31-May-2016::13:30:38 ===
accepting AMQP connection <0.1559.0> (127.0.0.1:63742 -> 127.0.0.1:5672)

=ERROR REPORT==== 31-May-2016::13:33:38 ===
closing AMQP connection <0.1559.0> (127.0.0.1:63742 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 60s
@pat1

This comment has been minimized.

Show comment
Hide comment
@pat1

pat1 Jun 1, 2016

looking at:
http://stackoverflow.com/questions/35438843/rabbitmq-error-timeout

adding

self._channel.basic_qos(prefetch_count=1)

before

self._consumer_tag = self._channel.basic_consume(self.on_message,queue)

solve the problem.
Note that I have more than 1000 messages in queue and some are not little.

I think this configuration/example should be published somewhere as template for a very common user case application.

Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.

You can evaluate to close this issue.

pat1 commented Jun 1, 2016

looking at:
http://stackoverflow.com/questions/35438843/rabbitmq-error-timeout

adding

self._channel.basic_qos(prefetch_count=1)

before

self._consumer_tag = self._channel.basic_consume(self.on_message,queue)

solve the problem.
Note that I have more than 1000 messages in queue and some are not little.

I think this configuration/example should be published somewhere as template for a very common user case application.

Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.

You can evaluate to close this issue.

@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl Jun 1, 2016

Member

@pat1, thank you for following up. My expectation would be that RMQ should be doing non-blocking I/O, and should not fail this way. This sounds to me like a bug in RMQ.

I would like to investigate and follow up on this with the RMQ team. Your code snippet https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e contains only the consumer. Would you mind also adding a small script that populates the queue with the size and number of messages that will surely reproduce the failure? Using pika.BlockingConnection would make it super simple to do by avoiding all those callbacks. Thank you.

Member

vitaly-krugl commented Jun 1, 2016

@pat1, thank you for following up. My expectation would be that RMQ should be doing non-blocking I/O, and should not fail this way. This sounds to me like a bug in RMQ.

I would like to investigate and follow up on this with the RMQ team. Your code snippet https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e contains only the consumer. Would you mind also adding a small script that populates the queue with the size and number of messages that will surely reproduce the failure? Using pika.BlockingConnection would make it super simple to do by avoiding all those callbacks. Thank you.

@eandersson

This comment has been minimized.

Show comment
Hide comment
@eandersson

eandersson Jun 4, 2016

Contributor

I am seeing this in other AMQP libraries as well.

Contributor

eandersson commented Jun 4, 2016

I am seeing this in other AMQP libraries as well.

@pat1

This comment has been minimized.

Show comment
Hide comment
@pat1

pat1 commented Jun 7, 2016

To be clear this do not work:
https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e/3dc43a75c874d76ecdd1412f7b91c06febd848e1

this work:
https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e/7246b17d7b4b883d9ff68e53dc65657cdd2eedde

@vitaly-krugl I do not have a script to populates the queue, I think you can fill it with 1000 messages of 100k size

@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl Jun 7, 2016

Member

Thank you, @pat1.

Member

vitaly-krugl commented Jun 7, 2016

Thank you, @pat1.

@barroca

This comment has been minimized.

Show comment
Hide comment
@barroca

barroca Mar 5, 2017

Hello, is there any update regarding this issue? We started getting this error this week, but we didn't change anything except for the environment we run our code.

Thanks

barroca commented Mar 5, 2017

Hello, is there any update regarding this issue? We started getting this error this week, but we didn't change anything except for the environment we run our code.

Thanks

@jmartinez-jobsity

This comment has been minimized.

Show comment
Hide comment
@jmartinez-jobsity

jmartinez-jobsity Mar 6, 2017

I'm also having this issue, even with the prefetch_count=1 solution @pat1 suggested

jmartinez-jobsity commented Mar 6, 2017

I'm also having this issue, even with the prefetch_count=1 solution @pat1 suggested

@barroca

This comment has been minimized.

Show comment
Hide comment
@barroca

barroca Mar 6, 2017

moving from time.sleep() to the busy wait:

def _safe_sleep(duration):
    deadline = time() + duration
    time_limit = duration

    while True:
        time_limit = deadline - time()

        if time_limit <= 0:
            break

worked for me :)

barroca commented Mar 6, 2017

moving from time.sleep() to the busy wait:

def _safe_sleep(duration):
    deadline = time() + duration
    time_limit = duration

    while True:
        time_limit = deadline - time()

        if time_limit <= 0:
            break

worked for me :)

@jmartinez-jobsity

This comment has been minimized.

Show comment
Hide comment
@jmartinez-jobsity

jmartinez-jobsity Mar 6, 2017

I'm not using any sleeps, all the time is being spent in actual message processing, which might include heavy SQL calls and a lot of requests calls among others

jmartinez-jobsity commented Mar 6, 2017

I'm not using any sleeps, all the time is being spent in actual message processing, which might include heavy SQL calls and a lot of requests calls among others

westphahl added a commit to westphahl/pika that referenced this issue Mar 16, 2017

blocking_adapter: only consider processable events
In process_data_events() the common_terminator that is passed to
_flush_output() was also set to true for events that could not be
processed because of the call context.

Considering those events anyway will lead to _flush_output() returning
immediately.

This can lead to a dropped connection e.g. when there is a timeout
event ready but process_data_events() is called from a callback
(e.g. on_message). The timeout event is considered as a terminator in
this case but can not be processed.

Fixes: pika#753
@rogamba

This comment has been minimized.

Show comment
Hide comment
@rogamba

rogamba Jun 29, 2017

@pat1 Been struggling with this for days. This solved my problem, you're the man!

rogamba commented Jun 29, 2017

@pat1 Been struggling with this for days. This solved my problem, you're the man!

@lukebakken

This comment has been minimized.

Show comment
Hide comment
@lukebakken

lukebakken Jul 25, 2017

Contributor

I did all of the following investigation using Pika at tag 0.10.0, Python 2.7.13 and RabbitMQ 3.6.10

The provided code to reproduce this issue does indeed show it on my machine. Note: you don't need a 10 minute sleep to reproduce this - anything over 30 seconds works.

During the time.sleep before sending the acknowledgement, there is a TCP send waiting in RMQ that times out, because the default TCP send timeout is 30 seconds. At this point in time I'm not sure what part of the AMQP protocol RabbitMQ is sending to the client. More investigation is needed.

So, to address this issue, you have the following options:

  • Use _channel.basic_qos(prefetch_count=1) - this does resolve the issue in my environment, probably because it consumes whatever data RabbitMQ is sending that would normally time out.
  • Ack the message immediately, then do your work. Of course this means that an error during processing will cause that message to be lost.

PR #843 does not appear to resolve this issue.

Contributor

lukebakken commented Jul 25, 2017

I did all of the following investigation using Pika at tag 0.10.0, Python 2.7.13 and RabbitMQ 3.6.10

The provided code to reproduce this issue does indeed show it on my machine. Note: you don't need a 10 minute sleep to reproduce this - anything over 30 seconds works.

During the time.sleep before sending the acknowledgement, there is a TCP send waiting in RMQ that times out, because the default TCP send timeout is 30 seconds. At this point in time I'm not sure what part of the AMQP protocol RabbitMQ is sending to the client. More investigation is needed.

So, to address this issue, you have the following options:

  • Use _channel.basic_qos(prefetch_count=1) - this does resolve the issue in my environment, probably because it consumes whatever data RabbitMQ is sending that would normally time out.
  • Ack the message immediately, then do your work. Of course this means that an error during processing will cause that message to be lost.

PR #843 does not appear to resolve this issue.

@lukebakken

This comment has been minimized.

Show comment
Hide comment
@lukebakken

lukebakken Jul 26, 2017

Contributor

I am going to close this as everything is working as expected -

The reproducer code opens a connection and channel to RabbitMQ, but does not specify basic.qos. This indicates to RabbitMQ that there should be no limit to the number of messages it can send to this client without acknowledgement. In addition, no_ack is False (the default setting) meaning that RabbitMQ will expect acknowledgement of every message delivered before de-queuing it.

In this scenario, the TCP connection can be thought of as a big pipe with no limits other than it's size (i.e. buffer sizes) on how much data can be pushed through it. So, RabbitMQ keeps sending data to the client while it sleeps after receiving the first message. Depending on message size and total count, the TCP "pipe" is big enough to hold all of these interim messages so RabbitMQ does not time out during the send operations. When message size and count is big enough to fill all buffers, RabbitMQ blocks on the send and eventually times out after 30 seconds which is what you see in the logs ({writer,send_failed,{error,timeout}}). The connection is closed and RabbitMQ re-tries sending all of those messages because none were acknowledged prior to the timeout.

This is one reason why it is critical to use channel.basic_qos(prefetch_count=N). That will limit the number of unacknowledged messages RabbitMQ will send to that particular client and will prevent the TCP send timeouts. This is also why @eandersson's comment about other client libraries applies since this not limited to Pika.

basic.qos docs

Contributor

lukebakken commented Jul 26, 2017

I am going to close this as everything is working as expected -

The reproducer code opens a connection and channel to RabbitMQ, but does not specify basic.qos. This indicates to RabbitMQ that there should be no limit to the number of messages it can send to this client without acknowledgement. In addition, no_ack is False (the default setting) meaning that RabbitMQ will expect acknowledgement of every message delivered before de-queuing it.

In this scenario, the TCP connection can be thought of as a big pipe with no limits other than it's size (i.e. buffer sizes) on how much data can be pushed through it. So, RabbitMQ keeps sending data to the client while it sleeps after receiving the first message. Depending on message size and total count, the TCP "pipe" is big enough to hold all of these interim messages so RabbitMQ does not time out during the send operations. When message size and count is big enough to fill all buffers, RabbitMQ blocks on the send and eventually times out after 30 seconds which is what you see in the logs ({writer,send_failed,{error,timeout}}). The connection is closed and RabbitMQ re-tries sending all of those messages because none were acknowledged prior to the timeout.

This is one reason why it is critical to use channel.basic_qos(prefetch_count=N). That will limit the number of unacknowledged messages RabbitMQ will send to that particular client and will prevent the TCP send timeouts. This is also why @eandersson's comment about other client libraries applies since this not limited to Pika.

basic.qos docs

@lukebakken lukebakken closed this Jul 26, 2017

@pat1

This comment has been minimized.

Show comment
Hide comment
@pat1

pat1 Jul 26, 2017

As written some time ago the use of
self._channel.basic_qos(prefetch_count=1)
should be published somewhere as template for a very common user case application.

Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.

pat1 commented Jul 26, 2017

As written some time ago the use of
self._channel.basic_qos(prefetch_count=1)
should be published somewhere as template for a very common user case application.

Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.

@NavalKK

This comment has been minimized.

Show comment
Hide comment
@NavalKK

NavalKK Sep 26, 2017

@lukebakken I'm also having this issue, even with the prefetch_count=1 options. In my rabbitmq consumer actual parsing of a rabbitmq message involves many mongo queries, So it may take around 10-15 minutes, but i'm using heartbeat_interval:90. Is there any chance that i can use heartbeat interval value other than 0.

Rabbitmq log look like this.-
`=INFO REPORT==== 26-Sep-2017::13:14:32 ===
accepting AMQP connection <0.889.0> (127.0.0.1:55214 -> 127.0.0.1:5672)

=INFO REPORT==== 26-Sep-2017::13:14:32 ===
connection <0.889.0> (127.0.0.1:55214 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'

=INFO REPORT==== 26-Sep-2017::13:14:32 ===
connection <0.892.0> (127.0.0.1:55216 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'

=ERROR REPORT==== 26-Sep-2017::13:19:02 ===
closing AMQP connection <0.892.0> (127.0.0.1:55216 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 90s`

Can you please suggest what i'm doing wrong. code for the consumer is same as of @pat1.
code link

NavalKK commented Sep 26, 2017

@lukebakken I'm also having this issue, even with the prefetch_count=1 options. In my rabbitmq consumer actual parsing of a rabbitmq message involves many mongo queries, So it may take around 10-15 minutes, but i'm using heartbeat_interval:90. Is there any chance that i can use heartbeat interval value other than 0.

Rabbitmq log look like this.-
`=INFO REPORT==== 26-Sep-2017::13:14:32 ===
accepting AMQP connection <0.889.0> (127.0.0.1:55214 -> 127.0.0.1:5672)

=INFO REPORT==== 26-Sep-2017::13:14:32 ===
connection <0.889.0> (127.0.0.1:55214 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'

=INFO REPORT==== 26-Sep-2017::13:14:32 ===
connection <0.892.0> (127.0.0.1:55216 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'

=ERROR REPORT==== 26-Sep-2017::13:19:02 ===
closing AMQP connection <0.892.0> (127.0.0.1:55216 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 90s`

Can you please suggest what i'm doing wrong. code for the consumer is same as of @pat1.
code link

@scherma

This comment has been minimized.

Show comment
Hide comment
@scherma

scherma Jan 26, 2018

I am also having this issue, and I don't think it is anything to do with having a wait before message acknowledgement (receipt to ack is less than 1 sec, however the task after the ack does take 3-4 minutes).

My relevant functions are here and here. Using prefetch_count=1 makes no difference. Calling stop_consuming prior to running the task and start_consuming after makes no difference.

scherma commented Jan 26, 2018

I am also having this issue, and I don't think it is anything to do with having a wait before message acknowledgement (receipt to ack is less than 1 sec, however the task after the ack does take 3-4 minutes).

My relevant functions are here and here. Using prefetch_count=1 makes no difference. Calling stop_consuming prior to running the task and start_consuming after makes no difference.

@vitaly-krugl

This comment has been minimized.

Show comment
Hide comment
@vitaly-krugl

vitaly-krugl Feb 17, 2018

Member

add_callback_threadsafe in pull request #956 might help with this. See this example

Member

vitaly-krugl commented Feb 17, 2018

add_callback_threadsafe in pull request #956 might help with this. See this example

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment