Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When server closes TCP connection because TCP window is full, connection recovery does not kick in #341

Closed
vincentjames501 opened this issue Jan 11, 2018 · 28 comments
Assignees
Milestone

Comments

@vincentjames501
Copy link

vincentjames501 commented Jan 11, 2018

Summary

We started seeing the following in production:

java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:372)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:365)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1169)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
	at RMQProblemTest$1.handleDelivery(RMQProblemTest.java:87)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

After looking at a packet capture, we were seeing [TCP Window Full] messages when RabbitMQ was sending to our consumer followed by a ton of [TCP ZeroWindow] frames coming from the consumer. After enough of these, RabbitMQ abruptly closes the connection by sending a RST frame and reporting only this in the logs:

=ERROR REPORT==== 10-Jan-2018::20:38:08 ===
closing AMQP connection <0.5373.1> ([::1]:53814 -> [::1]:5672):
{writer,send_failed,{error,timeout}}

Further investigation revealed that I was setting the QoS for the consumer only AFTER I had already started consuming (i.e. I was calling basicConsume directly before basicQos). I realize that this was wrong of me, however, it seems odd that this would cause the exception above especially with such a relatively small number of small messages. The more concerning thing is that none of the recovery/shutdown methods on either the connection or channel seemed to be called despite the fact that the connection was indeed closed.

Reproduction

I made a little test project that shows the issue.

https://github.com/vincentjames501/rmq-fail/blob/master/src/test/java/RMQProblemTest.java

Running the test will show the issue after 30 seconds or so. It appears reproducible on most RabbitMQ versions but I documented by setup below.

Environment

RabbitMQ server - 3.6.12
Erlang - Erlang 20.1
Operating system version (and distribution, if applicable) - OSX 10.13.2
All client libraries used - RabbitMQ Java Client 4.4.1
RabbitMQ plugins (if applicable) - Just Management Console

Thanks!

@michaelklishin
Copy link
Member

michaelklishin commented Jan 11, 2018

I executed the provided test. It simulates a slow consumer which does eventually lead to a full TCP buffer on the server end that trips up socket writes. It works well over localhost. Connection recovery does kick in, however: the "Consumed so far …" counter does keep growing:

Consumed so far - 0
Consumed so far - 0
Consumed so far - 0
Consumed so far - 0
Consumed so far - 1
Consumed so far - 1
Consumed so far - 2

[edited out for brevity]

Consumed so far - 11
Consumed so far - 11
java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:372)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:365)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1169)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
	at RMQProblemTest$1.handleDelivery(RMQProblemTest.java:94)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Consumed so far - 11
Consumed so far - 12
Consumed so far - 12
java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:372)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:365)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1169)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
	at RMQProblemTest$1.handleDelivery(RMQProblemTest.java:94)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Consumed so far - 12
Consumed so far - 13

[edited out for brevity]

Consumed so far - 30
Consumed so far - 31
Consumed so far - 31
java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:372)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:365)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1169)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
	at RMQProblemTest$1.handleDelivery(RMQProblemTest.java:94)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

[edited out for brevity]

Consumed so far - 38
Consumed so far - 39
Consumed so far - 39
java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:372)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:365)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1169)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
	at RMQProblemTest$1.handleDelivery(RMQProblemTest.java:94)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Consumed so far - 39

…

There's evidence of successful client reconnections in the broker log:

2018-01-11 11:20:03.889 [info] <0.761.0> accepting AMQP connection <0.761.0> ([fe80::1]:57965 -> [fe80::1]:5672)
2018-01-11 11:20:03.975 [error] <0.761.0> Error on AMQP connection <0.761.0> ([fe80::1]:57965 -> [fe80::1]:5672, state: starting):
PLAIN login refused: user 'guest' can only connect via localhost
2018-01-11 11:20:03.976 [info] <0.761.0> closing AMQP connection <0.761.0> ([fe80::1]:57965 -> [fe80::1]:5672)
2018-01-11 11:20:03.982 [info] <0.766.0> accepting AMQP connection <0.766.0> ([::1]:57966 -> [::1]:5672)
2018-01-11 11:20:04.012 [info] <0.766.0> connection <0.766.0> ([::1]:57966 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'
2018-01-11 11:20:04.035 [info] <0.778.0> accepting AMQP connection <0.778.0> ([fe80::1]:57967 -> [fe80::1]:5672)
2018-01-11 11:20:04.049 [error] <0.778.0> Error on AMQP connection <0.778.0> ([fe80::1]:57967 -> [fe80::1]:5672, state: starting):
PLAIN login refused: user 'guest' can only connect via localhost
2018-01-11 11:20:04.050 [info] <0.778.0> closing AMQP connection <0.778.0> ([fe80::1]:57967 -> [fe80::1]:5672)
2018-01-11 11:20:04.057 [info] <0.782.0> accepting AMQP connection <0.782.0> ([::1]:57968 -> [::1]:5672)
2018-01-11 11:20:04.063 [info] <0.782.0> connection <0.782.0> ([::1]:57968 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'
2018-01-11 11:20:36.379 [warning] <0.782.0> closing AMQP connection <0.782.0> ([::1]:57968 -> [::1]:5672):
{writer,send_failed,{error,timeout}}

@michaelklishin michaelklishin changed the title SocketException, TCP Window Full, Zero Window, and Recovery Recovery listeners do not seem to fire when TCP window is full Jan 11, 2018
@michaelklishin michaelklishin changed the title Recovery listeners do not seem to fire when TCP window is full Connection recovers but recovery listeners do not seem to fire when TCP window is full Jan 11, 2018
@vincentjames501
Copy link
Author

Thanks @michaelklishin for looking into this! If you are seeing connection recoveries, then that is more than I'm seeing. The moment I see:

2018-01-11 07:09:24.930 [warning] <0.548.0> closing AMQP connection <0.548.0> (127.0.0.1:57760 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}

I see no additional reconnects to the client (and your log does too?) and Wireshark seems to confirm this as well. I see a two TCP connections established to RabbitMQ from the Java process: the producer connection and consumer connection. Once the TCP buffer is full, RabbitMQ appears to close the connection. No matter how long I wait, I see none of my Shutdown or Recovery listeners fire and Wireshark and RabbitMQ logs seems to indicate no further connections being established.

I haven't looked deeply into the java client library, but I suspect that the reason it keeps consuming messages is because they are simply buffered in memory and we're processing them very slowly (one ever three seconds). If the underlying connection did recover, I suspect that the basicAcks would eventually begin to work again, but no matter how long you run the tests they never begin to work.

@Stephan202
Copy link

This is not my area of expertise, but I can say that we observe a similar issue from time to time, going back to the 4.x series of the client. Symptoms: (a) only happens when a lot of messages are sent and received (single instance) within a very short time span, (b) only happens when SSL is enabled and (c) recovery doesn't kick in. I'm sorry that I cannot be of more help; we cannot reproduce the issue on demand (which is why so far we haven't filed an issue) and I cannot even say for sure whether what we observe matches @vincentjames501's issue. But it does sound like it.

@michaelklishin
Copy link
Member

Socket write timeouts can be configured using the {send_timeout, Integer} option. They cannot be avoided entirely but perhaps you can have fewer of them. It is also possible to increase TCP buffer size. Connections then will use more RAM but the probability of running into full TCP buffers and subsequent socket write timeouts will be lower.

Our team does not use issues for investigations or discussions. This one is about listeners not firing even though recovery does happen with the provided test.

@acogoluegnes acogoluegnes added this to the 4.4.3 milestone Jan 11, 2018
@vincentjames501
Copy link
Author

This one is about listeners not firing even though recovery does happen with the provided test.

@michaelklishin , are you positive about this? As I mentioned above I'm not seeing this at all and your log suggested that too from what you posted. Any feedback on my comment #341 (comment) ?

@michaelklishin
Copy link
Member

michaelklishin commented Jan 11, 2018 via email

@michaelklishin
Copy link
Member

michaelklishin commented Jan 11, 2018 via email

@michaelklishin
Copy link
Member

I re-run the test and here are some more findings.


The test does not instruct the client to [recover the topology](http://www.rabbitmq.com/api-guide.html#recovery):

``` java
final ConnectionFactory factory = new ConnectionFactory();
        factory.setAutomaticRecoveryEnabled(true);
        // facory.setTopologyRecoveryEnabled(true); is missing
        factory.setHost("localhost");

So even after connection recovery it would not get any more deliveries @vincentjames501 expects.

I see different socket exception variations e.g.

2018-01-11 18:49:46.496 [warning] <0.809.0> closing AMQP connection <0.809.0> ([::1]:64100 -> [::1]:5672):
{inet_error,enotconn}

According to RabbitMQ's connection state (rabbitmq_networking:connections/0), there is one active connection to the node. netstat confirms this.

There are two things that I haven't noticed the first time:

  • The test opens two connections.
  • The exception about a broken pipe comes from a Connection#close attempt in the tearDown method. This suggests the socket is in an interesting state where it cannot be used but the client doesn't seem to be aware of it as no main I/O loop exceptions were raised.

Without inspecting the entire capture and adding a certain amount of debug logging to the client I cannot tell why the client never gets any I/O exceptions on the path that kicks off connection recovery. Once we have more details and can confirm that it can be considered a bug in the library, a new issue will be filed.

Using manual acknowledgements with a limited prefetch avoids the fundamental issue of TCP buffers filling up, so that's the recommendation we have in the meantime.

Thanks again for the provided example.

@michaelklishin michaelklishin removed this from the 4.4.3 milestone Jan 11, 2018
@michaelklishin michaelklishin changed the title Connection recovers but recovery listeners do not seem to fire when TCP window is full When server closes TCP connection because TCP window is full, connection recovery does not kick in Jan 11, 2018
@michaelklishin
Copy link
Member

@Stephan202 if what you see only happens with TLS enabled, it could be related to this rabbitmq-users thread.

@vincentjames501
Copy link
Author

facory.setTopologyRecoveryEnabled(true) is enabled by default so adding it is redundant, no?

new ConnectionFactory().isAutomaticRecoveryEnabled(); => true
new ConnectionFactory().isTopologyRecoveryEnabled(); => true

I pushed some small changes to the test to make things easier. I now close the producer connection & channel after we're done and name the connections so they are easier to see and visualize. I also added the topology recovery option.

I don't know why this issue keeps getting closed. There is plenty of information here to suggest an issue with the client.

Here is a video showing that there is no reconnection attempts at all made.

Video

@michaelklishin
Copy link
Member

michaelklishin commented Jan 11, 2018

@vincentjames501 our team does not use GitHub issues for investigations. When we understand what is going on, a new issue will be filed. If you have new findings, feel free to start a rabbitmq-users thread.

@rabbitmq rabbitmq locked and limited conversation to collaborators Jan 11, 2018
@michaelklishin
Copy link
Member

For those who are affected: the workaround is to use manual acknowledgements with a prefetch.

@acogoluegnes acogoluegnes added this to the 4.4.3 milestone Jan 12, 2018
@acogoluegnes
Copy link
Contributor

acogoluegnes commented Jan 12, 2018

I imported the project but couldn't reproduce the issue. I got the following twice:

...
Consumed so far - 78
Consumed so far - 79
Consumed so far - 79
Consumed so far - 79

java.lang.AssertionError: We consume our messages within the specified time. We consumed 80 messages.

My environment:

  • RabbitMQ 3.6.12 and 3.7.0
  • Erlang 20.0.1
  • Java 1.8.0_152
  • Java Client 4.4.1
  • Ubuntu 14.04, 4.4.0-109-generic

@vincentjames501
Copy link
Author

@acogoluegnes , FWIW I can't get it to happen when running RabbitMQ inside the official RabbitMQ docker container. It is easily reproducible on an OSX machine running the official RabbitMQ binaries directly on the host (OSX 10.13.2). I forgot to note that in the original bug report. We DO see the exact same behavior in our AWS environment, however, so I don't think it is a Linux vs OSX thing either. I think the reason it works in Docker for me is because it is substantially slower when using persistent messages than it is when I run it natively but it could be a lower level networking difference as well (though seems unlikely?).

@michaelklishin
Copy link
Member

I can reproduce it on a 4 year old Mac easily. I suspect the rate of deliveries has to be at least so high for TCP window to be exhausted completely.

@vincentjames501
Copy link
Author

@acogoluegnes , you may be able to get it to happen by disabling persistent messages to increase the rate of delivery? Not entirely sure but I created that test to be a boiled down version of exactly what we are doing in production so I didn't tinker with many more settings.

@acogoluegnes
Copy link
Contributor

Reproduced as-is on my 2015 MBP:

Consumed so far - 0
Consumed so far - 0
Consumed so far - 0
Consumed so far - 0
Consumed so far - 1
...
Consumed so far - 10
Consumed so far - 10
Consumed so far - 10
Consumed so far - 11
Consumed so far - 11
Problem acking message Broken pipe
Consumed so far - 11
Consumed so far - 12
Consumed so far - 12
Problem acking message Broken pipe
Consumed so far - 12
Consumed so far - 13
Consumed so far - 13
Problem acking message Broken pipe

Reproduced on Linux by setting the number of messages to 100 000:

Consumed so far - 0
Consumed so far - 0
Consumed so far - 0
Consumed so far - 0
Consumed so far - 1
Consumed so far - 1
Consumed so far - 1
Consumed so far - 9
Consumed so far - 9
Consumed so far - 9
Consumed so far - 10
Consumed so far - 10
Consumed so far - 10
Consumed so far - 11
Consumed so far - 11
Problem acking message Connection reset
Consumed so far - 11
Consumed so far - 12
Consumed so far - 12
Problem acking message Broken pipe (Write failed)
Consumed so far - 12
Consumed so far - 13
Consumed so far - 13
Problem acking message Broken pipe (Write failed)

@acogoluegnes
Copy link
Contributor

A thread dump reveals the reading thread is stuck waiting for a delivery to be put on the work queue. Unfortunately, this operation has not timeout. Needs more digging to know why the consumption of deliveries doesn't unblock the working queue.

@acogoluegnes
Copy link
Contributor

The consumption does unblock the working queue, but more deliveries come in. They must be the remaining of the saturated TCP buffer. Automatic connection recovery doesn't kick in as it's started only on the reading side. The reading thread will finally notice something is wrong once it has finished reading the TCP buffer, so this can take a while.

There's nothing much we can do to detect the connection is dead, considering this is a typical case of the broker overflowing a slow client and that's why QoS has been implemented for.

We can make the work queue capacity configurable (the default is 1000 per channel). Making it bigger wouldn't hurt in cases like this one. WDYT @vincentjames501 @michaelklishin?

@vincentjames501
Copy link
Author

I haven't spent much time analyzing the internals of the library to give a thoughtful response, however, there must be some hook that the underlying TCP connection is disconnected, no? A packet capture clearly shows the connection is terminated, shouldn't the consuming thread notice this and kick off a recovery (and stop delivering more work to the subscribers)?

Would you mind explaining (because of my ignorance) why this is different than a typical connection failure? If I just terminate the connection at RabbitMQ connection recovery initiates immediately even if there are unacked messages.

@michaelklishin
Copy link
Member

@vincentjames501 this feature has been around for years, no need to assume it doesn't handle most obvious failure scenarios.

If we ignore NIO for moment, every connection has an I/O loop thread which handles all exceptions and starts a shutdown procedure. Connections with enabled automatic recovery register a shutdown hook that kicks off the recovery.

When TCP window is saturated, there is no I/O exception thrown by the JDK even after the server detects missed heartbeats and closes the connection. If knew why this would likely have been addressed already.

@michaelklishin
Copy link
Member

@acogoluegnes seems to have a hypothesis, we are discussing it at the moment.

@michaelklishin
Copy link
Member

@acogoluegnes I think we should try making the interval lower, or rather, in line with the heartbeat mechanism. Having a consumer work queue timeout would be an improvement but it wouldn't address the root cause here unless I misunderstand the above explanation.

@michaelklishin
Copy link
Member

michaelklishin commented Jan 31, 2018

This reminds me of a similar scenario that took us a long time to handle well in the server, see rabbitmq/rabbitmq-common#31. Socket implementations can behave differently when it comes to event/error reporting on sockets with saturated buffers (windows).
In the server's case we made the heartbeat monitor take more responsibility and force error condition detection in the connection process.

It should be possible to do something similar with JDK sockets, although it can be hacky as hell.

@acogoluegnes
Copy link
Contributor

@vincentjames501 Believe it or not, but there's no hook to know the TCP connection is terminated with Java's Socket API.

As @michaelklishin suggested, we should be able to notify the reading thread that the connection is gone from the heartbeat sender or from any writing operation that has failed due a network problem. The only way is to interrupt the reading thread. Interrupting a thread in Java doesn't carry much information, but this should be enough to know when to trigger recovery.

acogoluegnes added a commit that referenced this issue Feb 5, 2018
Detecting connection failure on reading can take a lot of time
(even forever if the reading thread is stuck), so connection
recovery can now be triggered when a write operation fails.
This can make the client more reactive to detect failing connections.

References #341
acogoluegnes added a commit that referenced this issue Feb 5, 2018
Test disabled for NIO: the IO thread (reading and writing) can
be stuck in reading mode (work pool full), which also blocks the
writing (no heartbeat), and no detection connection failure
detection.

References #341
@acogoluegnes acogoluegnes self-assigned this Feb 6, 2018
acogoluegnes added a commit that referenced this issue Feb 6, 2018
Now recovery can be triggered from write operations,
late connection failure discoveries can re-trigger the shutdown
and emit spurious exception.

References #341
acogoluegnes added a commit that referenced this issue Feb 12, 2018
Automatic connection recovery triggers now by default when a write
operation fails because of an IO exception. The recovery process
takes place in a dedicated thread so the write operation doesn't
wait (it receives the same IO exception immediatly).

The test to trigger the error has changed: it doesn't use manual
ack anymore, as this could sometimes block the broker and make
recovery fail (broker was busy re-enqueuing messages). The test
now sends a message in the consumer, which is enough to reproduce
the error.

Note the test against NIO is skipped right now, as it needs
additional care.

[#154263515]

References #341
acogoluegnes added a commit that referenced this issue Feb 12, 2018
Making the work pool fail after it didn't manage  to enqueue
work for a given time makes the client more reactive
to broker overload. Note this usually happens to clients
that do not set QoS properly. Neverlethess, making the client
as early as possible can avoid hard-to-debug connection failure.

This complements the triggering of connection recovery on failed
write operations. Work pool enqueueing timeout is usefull for NIO,
where the same thread is used for both reading and writing (if
the thread is stuck waiting on work pool enqueueing, no write operation
can occur, and the TCP connection failure is never detected).

[#154263515]

Fixes #341
@michaelklishin
Copy link
Member

With a version I'm testing (#349), I observe a successful recovery:

2018-02-14 17:20:11.863 [info] <0.1005.0> accepting AMQP connection <0.1005.0> ([::1]:64548 -> [::1]:5672)
2018-02-14 17:20:12.085 [info] <0.1005.0> Connection <0.1005.0> ([::1]:64548 -> [::1]:5672) has a client-provided name: Producer Connection
2018-02-14 17:20:12.096 [info] <0.1005.0> connection <0.1005.0> ([::1]:64548 -> [::1]:5672 - Producer Connection): user 'guest' authenticated and granted access to vhost '/'
2018-02-14 17:20:12.112 [info] <0.1018.0> accepting AMQP connection <0.1018.0> ([::1]:64549 -> [::1]:5672)
2018-02-14 17:20:12.114 [info] <0.1018.0> Connection <0.1018.0> ([::1]:64549 -> [::1]:5672) has a client-provided name: Consuming Connection
2018-02-14 17:20:12.116 [info] <0.1018.0> connection <0.1018.0> ([::1]:64549 -> [::1]:5672 - Consuming Connection): user 'guest' authenticated and granted access to vhost '/'
2018-02-14 17:20:13.927 [info] <0.1005.0> closing AMQP connection <0.1005.0> ([::1]:64548 -> [::1]:5672 - Producer Connection, vhost: '/', user: 'guest')
2018-02-14 17:20:43.553 [warning] <0.1018.0> closing AMQP connection <0.1018.0> ([::1]:64549 -> [::1]:5672 - Consuming Connection):
{writer,send_failed,{error,timeout}}
2018-02-14 17:21:11.298 [info] <0.1067.0> accepting AMQP connection <0.1067.0> ([::1]:64563 -> [::1]:5672)
2018-02-14 17:21:11.303 [info] <0.1067.0> Connection <0.1067.0> ([::1]:64563 -> [::1]:5672) has a client-provided name: Consuming Connection
2018-02-14 17:21:11.305 [info] <0.1067.0> connection <0.1067.0> ([::1]:64563 -> [::1]:5672 - Consuming Connection): user 'guest' authenticated and granted access to vhost '/'
Consumed so far - 18
Consumed so far - 19
Recovery Started for Consuming Connection amqp://guest@0:0:0:0:0:0:0:1:5672/
Recovery Started for Consuming Channel AMQChannel(amqp://guest@0:0:0:0:0:0:0:1:5672/,1)
Recovery finished for Consuming Channel AMQChannel(amqp://guest@0:0:0:0:0:0:0:1:5672/,1)
Recovery finished for Consuming Connection amqp://guest@0:0:0:0:0:0:0:1:5672/
Consumed so far - 19
Consumed so far - 19
Consumed so far - 20
Consumed so far - 20

acogoluegnes added a commit that referenced this issue Feb 15, 2018
This way connection recovery triggering on write can be
disabled or customised.

[#154263515]

References #341
acogoluegnes added a commit that referenced this issue Feb 15, 2018
acogoluegnes added a commit that referenced this issue Feb 19, 2018
[#154263515]

References #341
acogoluegnes added a commit that referenced this issue Feb 20, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants