Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What to do on Error: Unexpected close #151

Closed
grahamjenson opened this issue May 1, 2015 · 30 comments
Closed

What to do on Error: Unexpected close #151

grahamjenson opened this issue May 1, 2015 · 30 comments

Comments

@grahamjenson
Copy link

A bug occurs when an "Error: Unexpected close" happens:

  1. publish multiple messages on a channel to a queue
  2. an "Error: Unexpected close" happens
  3. some of the messages never make it to rabbitmq (verified using tracer plugin)
  4. the error is emitted in the on('error', ...) handler

The messages are added before the error occurs so there is no way to handle it while writing the messages.

I think this happens because the stream/socket buffer is not empty, and when the stream dies the messages are lost with no feedback to the user.

Is there any way to find out what messages did not make it onto the queue, so that once reconnected they can be re-published?

@michaelklishin
Copy link

Please post RabbitMQ log files and a script that you use to reproduce.

@grahamjenson
Copy link
Author

It is difficult to recreate an unexpected closure of the socket and have the script not blow up, but here is the best I have got on a friday evening:

amqp = require("amqplib")
bb = require 'bluebird'
qn = 'doesntmatter'

amqp.connect('amqp://localhost')
.then( (connection) -> 
  connection.createChannel()
)
.then( (channel) ->

  channel.assertQueue(qn)
  .then( ->
    #put 50 messages on the queue 1ms apart
    for i in [1...1000]
      #send a heap of messages
      published = channel.publish('', qn, new Buffer("This"), {})
      if !published
        throw "NOT EVEN ONCE"
    console.log "finished writing messages"
    #the assumption at this point is that all messages have been published to rabbit, as they all returned true

    #after 10ms artifically create unexpected close error
    bb.delay(10).then( -> channel.connection.stream.end())
  )
)

If we call this script (sorry for the coffeescript) then we get:

finished writing messages
events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: Unexpected close
  at succeed (/Users/.../node_modules/amqplib/lib/connection.js:265:13)
  at onOpenOk (/Users/.../node_modules/amqplib/lib/connection.js:247:5)
  at .../node_modules/amqplib/lib/connection.js:160:32
  at .../node_modules/amqplib/lib/connection.js:154:12
  at Socket.recv (/Users/.../node_modules/amqplib/lib/connection.js:486:12)
  at Socket.g (events.js:199:16)
  at Socket.emit (events.js:104:17)
  at emitReadable_ (_stream_readable.js:424:10)
  at emitReadable (_stream_readable.js:418:7)
  at readableAddChunk (_stream_readable.js:174:11)
  at Socket.Readable.push (_stream_readable.js:126:10)
  at TCP.onread (net.js:538:20)

Then we call this script:

amqp = require("amqplib")
bb = require 'bluebird'
qn = 'doesntmatter'

#util function for counting messages on a queue
amqp.connect('amqp://localhost')
.then( (connection) -> 
  connection.createChannel()
)
.then( (channel) ->
  channel.checkQueue(qn)
)
.then( (ret) ->
  console.log "#{ret.messageCount} messages"
)

we get:

883 messages

So the core issue is that somewhere 100 messages are in a buffer or somewhere. I have written auto-reconnect code that can handle the Unexpected close and reopen the amqp connection, but without being able to verify that the message was not received by rabbit, I cannot try an resend it.

The core problem is that when I call the publish method I cannot guarantee that rabbit has actually got the message. I am looking into confirmation queues at the moment. Would this be the best idea?

@michaelklishin
Copy link

What is in RabbitMQ logs?

Given that it happens when or around the time connection.open-ok is received, it is unlikely to be an authentication failure, for example. But only server log can really confirm or deny such hypotheses.

@grahamjenson
Copy link
Author

I ran the scripts again and got the same results, the trace shows only receiving about 800 messages before the connection ends. Is there a way in amqp.node to be notified that a message was not sent?

The rabbitmq log

 * rabbitmq_management_visualiser
 * rabbitmq_management
 * rabbitmq_web_dispatch
 * webmachine
 * mochiweb
 * rabbitmq_mqtt
 * rabbitmq_stomp
 * rabbitmq_management_agent
 * rabbitmq_amqp1_0
 * amqp_client

=INFO REPORT==== 4-May-2015::08:03:44 ===
Enabling tracing for vhost '/'

=INFO REPORT==== 4-May-2015::08:03:44 ===
Tracer opened log file "/var/tmp/rabbitmq-tracing/dropped_messages.log" with format text

=INFO REPORT==== 4-May-2015::08:04:54 ===
accepting AMQP connection <0.1377.0> (127.0.0.1:52046 -> 127.0.0.1:5672)

=WARNING REPORT==== 4-May-2015::08:04:54 ===
closing AMQP connection <0.1377.0> (127.0.0.1:52046 -> 127.0.0.1:5672):
connection_closed_abruptly

The rabbitmq trace

================================================================================
2015-05-04 8:04:54:231: Message published

Node:         rabbit@localhost
Connection:   127.0.0.1:52046 -> 127.0.0.1:5672
Virtual host: /
User:         guest
Channel:      1
Exchange:     
Routing keys: [<<"thisdoesntmatter">>]
Properties:   [{<<"headers">>,table,[]}]
Payload: 
This

===============================================================================

This x803 times

@squaremo
Copy link
Collaborator

squaremo commented May 4, 2015

I think this happens because the stream/socket buffer is not empty, and when the stream dies the messages are lost with no feedback to the user.

Absolutely: just like regular I/O, messages are buffered, and if they can't be written to the connection, they have to be thrown away.

You can use a conn.createConfirmChannel to get versions of publish and sendToQueue that take callbacks. Note that this doesn't make it any more likely that the messages make it to a queue, just that you'll get told when they have.

@grahamjenson
Copy link
Author

Is there a way to get feedback on which messages make it to the queue, or which ones get thrown away so that my application code can retry posting them?

@michaelklishin
Copy link

Publisher confirms.

@grahamjenson
Copy link
Author

confirmationChannels have a similar issue to above where if the connection dies just at the right time then the confirmation of delivery is lost and if retried the message is sent twice. Given I am writing an "at-most-once' kind of system, I was hoping for a cheap client approach to finding most the failed messages.

@squaremo
Copy link
Collaborator

squaremo commented May 4, 2015

Given I am writing an "at-most-once' kind of system, I was hoping for a cheap client approach to finding most the failed messages.

In principle the client library could tell you when it writes a message to the socket buffer, but that's the best it can do I think. There's no guarantee that written to the socket means received by the broker (or even transmitted to the broker), of course.

But you want to know which messages were definitely not sent. How would you expect (or imagine) this to be represented in the API?

@grahamjenson
Copy link
Author

Could the publish method return a promise that is rejected if the message cannot be written to the socket, and resolved once it is written? This way the client can retry sending the messages that amqp.node knows were never sent.

Then the only problem occurs when a message is written to the socket but never made it to rabbit. This would hopefully reduce the number of non-sent messages.

@nini-os
Copy link

nini-os commented May 5, 2015

I have a similar or maybe the same problem,

_channel.publish(EXCHANGE_NAME, '', buffer, {}, function (err) {
    if (err !== null) {
      log('Noack for message ', message);
    } else {
      log('message published', message);
    }
    callback(err, message);
  });

if the channel is closed (because of a connection problem) then publish does not return callbacks for unacknowledged message.

@grahamjenson
Copy link
Author

@chiarishow: the callback on a confirmation channel seems to maybe fire if it is close to a connection problem.
This is because if after you publish the message the connection dies, then rabbit is unable to tell you that it successfully published, but it did. vs if the connection dies while the message is being sent then the message was not published and you received no callback.

At this point either you try send the message again if you are a "send-at-least-once" system, or don't if you are a 'send-at-most-once' system.

This issue is that if the library knows that the message was NOT sent, to somehow tell the client, instead of telling the client only if it knows the message WAS sent.

@nini-os
Copy link

nini-os commented May 5, 2015

Yeah, it will be very useful to know if the message was not sent.

@nini-os
Copy link

nini-os commented May 6, 2015

I put up a probably horrible solution for my problem, I modified the C.toClosed function of channel.js and I fired the unacknowledged callback when the channel is closed.

C.toClosed = function(capturedStack) {
  this._rejectPending();
  invalidateSend(this, 'Channel closed', capturedStack);
  this.accept = invalidOp('Channel closed', capturedStack);
  this.connection.releaseChannel(this.ch);
  this.unconfirmed.forEach(function sendFail(callbackUnconfirmed) {
        if (callbackUnconfirmed !== null) {
                callbackUnconfirmed(new Error("Channel closed, no ack received"));
        }
  });
  this.emit('close');
};

Of course, this is not a perfect solution because, as grahamjenson said, the connection can die while the producer is waiting for the ack (but it's a rare case). I don't know if there's a better way to know if the message was not sent.

@squaremo
Copy link
Collaborator

squaremo commented May 6, 2015

@chiarishow That is not too bad, although usually you would be doing your own bookkeeping, and therefore now that if the callback isn't called, the message wasn't acknowledged.

There is still the case of messages that could not have been acked because they were never transmitted to RabbitMQ.

@grahamjenson:

Could the publish method return a promise that is rejected if the message cannot be written to the socket, and resolved once it is written?

It could, but that's third in line for a publish return value -- the first, and incumbent, is the flow control flag ("you can keep writing"). Second in line is the promise representing an acknowledgement.

Maybe such a callback could be a publish option; a bit icky, but not without some precedent, and I struggle to think of where else to put it other than in another argument.

@nini-os
Copy link

nini-os commented May 7, 2015

@squaremo well you are right but usually callback are always called. So in order to understand if a message was acknowledged or not I must listen to the channel "close" event and this is really tricky...

Thank you anyway

@grahamjenson
Copy link
Author

@squaremo why not use the call back given to the publish method for more than just confirmChannels, e.g. the err parameter could be used to differ between nack and failed to send.

so when you call publish(... function(err,ok){} the callback can be called multiple times, with different types of errors but only once if it is ok. If the channel is a confirm channel the callback ok is called when the server acks, and if it is not the callback ok is called when it is put on the socket. The call back error is called when the server nacks (if you are a confirm channel) or the message is unable to be put on the socket?

This would not break the current API, and give the option to listen for when messages fail?

@nini-os
Copy link

nini-os commented May 7, 2015

I totally agree with @grahamjenson solution

@squaremo
Copy link
Collaborator

@grahamjenson @chiarishow Yes, I think it is a simple and good idea.

Just to run this thought by you: it may be surprising to current users if they start getting what look like nacked messages but are in fact untransmitted messages. Nacks are supposed to be quite rare and indicate a nasty failure, so some people may take drastic measures when they receive them.

So: instead of using the error type to distinguish between the failure cases, what about an extra argument to the callback?

confirmChan.publish(new Buffer(), {}, function(nack, err, unsent) {
  // ...
});

A normal channel might take the same callback, with the expectation that nack will never be given a value. Alternatively, maybe the third argument could be "sent"? (It's a bit of a shame they end up in the "wrong order". Oh well.)

@nini-os
Copy link

nini-os commented May 12, 2015

Well it's a good solution too!

@grahamjenson
Copy link
Author

If it doesn't break the existing API and solves the problem I am happy :)

@imran-uk
Copy link

imran-uk commented Jun 29, 2017

I think I have a similar issue but in my case I am reading from a message queue rather than publishing to it:

events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: Unexpected close
    at succeed (/opt/nodeapp/node_modules/amqplib/lib/connection.js:270:13)
    at onOpenOk (/opt/nodeapp/node_modules/amqplib/lib/connection.js:252:5)
    at /opt/nodeapp/node_modules/amqplib/lib/connection.js:165:32
    at /opt/nodeapp/node_modules/amqplib/lib/connection.js:159:12
    at Socket.recv (/opt/nodeapp/node_modules/amqplib/lib/connection.js:497:12)
    at Socket.g (events.js:292:16)
    at emitNone (events.js:86:13)
    at Socket.emit (events.js:185:7)
    at emitReadable_ (_stream_readable.js:432:10)
    at emitReadable (_stream_readable.js:426:7)
    at readableAddChunk (_stream_readable.js:187:13)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at TCP.onread (net.js:551:20)

Reading and understanding this whole thread will take a while but I will try - otherwise any insights/comments would be useful. This is the first time I have seen this error with this code.

I am using "amqplib": "0.5.1", and "node": ">=7.4.0"

@vietlv
Copy link

vietlv commented Sep 19, 2017

Help me !

events.js:182
throw er; // Unhandled 'error' event
^

Error: Unexpected close
at succeed (E:\VNPT\2017\DCManager\SmartAds\node_modules\amqplib\lib\connection.js:270:13)
at onOpenOk (E:\VNPT\2017\DCManager\SmartAds\node_modules\amqplib\lib\connection.js:252:5)
at E:\VNPT\2017\DCManager\SmartAds\node_modules\amqplib\lib\connection.js:165:32
at E:\VNPT\2017\DCManager\SmartAds\node_modules\amqplib\lib\connection.js:159:12
at Socket.recv (E:\VNPT\2017\DCManager\SmartAds\node_modules\amqplib\lib\connection.js:497:12)
at Object.onceWrapper (events.js:314:30)
at emitNone (events.js:105:13)
at Socket.emit (events.js:207:7)
at emitReadable_ (_stream_readable.js:502:10)
at emitReadable (_stream_readable.js:496:7)
at addChunk (_stream_readable.js:263:7)
at readableAddChunk (_stream_readable.js:239:11)
at Socket.Readable.push (_stream_readable.js:197:10)

@boshido
Copy link

boshido commented Sep 21, 2017

I'm facing the same issue like @vietlv and @imran-uk faced

I'm using node v6.11.2 and amqplib v0.5.1

@appableDev
Copy link

same with me. Please help!!!!
development APP [9:32 AM]
Error: Unexpected close
at succeed (/usr/src/app/node_modules/amqplib/lib/connection.js:270:13)
at onOpenOk (/usr/src/app/node_modules/amqplib/lib/connection.js:252:5)
at /usr/src/app/node_modules/amqplib/lib/connection.js:165:32
at /usr/src/app/node_modules/amqplib/lib/connection.js:159:12
at Socket.recv (/usr/src/app/node_modules/amqplib/lib/connection.js:497:12)
at Socket.g (events.js:292:16)
at emitNone (events.js:86:13)
at Socket.emit (events.js:185:7)
at emitReadable_ (_stream_readable.js:432:10)
at emitReadable (_stream_readable.js:426:7)
at readableAddChunk (_stream_readable.js:187:13)
at Socket.Readable.push (_stream_readable.js:134:10)
at TCP.onread (net.js:547:20)

node v6.11 and amqplib v0.5.1

@boshido
Copy link

boshido commented Sep 27, 2017

@imran-uk @vietlv @appableDev
My problem come from haproxy close the connection when consumer idle for 50-60 seconds
I tried to connect directly to RabbitMQ, so it's fine.

After I adjust haproxy timeout config refer
and add heartbeat to connection string, everything seem to be working fine

@michaelklishin
Copy link

There is no universal advice on what what to do when a TCP connection to RabbitMQ or intermediary is closed. Best piece of advice is: understand why it happened the best you can first.
I had to cover some common scenarios in a similar thread for the PHP client. Please take a look, grab RabbitMQ and (if possible) proxy/load balancer logs and see which scenario is most likely apply to your case.

@squaremo there are no specific improvements suggested in this thread and the underlying reasons are plenty. I think this should be closed as a question.

@michaelklishin
Copy link

Actually, I'm going to post an edited and expanded copy of that response since many never follow any links and read all comments unless it's a wall of text you cannot possibly miss.

What Do I Do When Client Connection to RabbitMQ is Interrupted?

This thread seems to be a honeypot for all socket operation (in particular read) failures and many
are looking for a workaround without understanding what is really going on. Below are some common scenarios that may lead to failed socket reads or writes in this or any other RabbitMQ client, compiled from from several years of RabbitMQ mailing list discussions. Please investigate if each of them may be applicable to your case first,
before filing a new issue or leaving a comment saying that "I too see a stack trace that looks like this".

Missed (Client) Heartbeats

First common reason is missed heartbeats detected by RabbitMQ. When this happens, RabbitMQ will add a log entry about it and then close connection, per specification requirements.
Here's what a missed client heartbeat looks like in RabbitMQ logs:

2017-09-26 08:04:53.596 [warning] <0.2375.628> closing AMQP connection <0.2375.628> (127.0.0.1:54720 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 8s

With clients where I/O operations are not concurrent to consumer operations (I believe this is the case here), if consumer operations take longer than the heartbeat timeout, RabbitMQ will detect missed client heartbeats. Disabling heartbeats may
help in this particular case but is not something I'd recommend. If you disable heartbeats,
make sure your kernel is configured to use sensible TCP keepalive settings, which by default is not the case on all popular
Linux distributions :(

An Intermediary Closes "Inactive" TCP Connections

Second common reason: TCP connection is closed by an intermediary (e.g. a proxy or load balancer).
If you see the following in RabbitMQ log

2017-09-26 08:08:39.659 [warning] <0.23042.628> closing AMQP connection <0.23042.628> (127.0.0.1:54792 -> 127.0.0.1:5672, vhost: '/', user: 'guest'):
client unexpectedly closed TCP connection

it means that client's TCP connection was closed before AMQP 0-9-1 (this client's) connection was. Sometimes this is harmless and means that apps do not close connections before they terminate. Not very nice but has no functional downsides. Such log entries could also indicate a failing client application process (irrelevant in this thread), or, quite commonly, a proxy closing the connection. Proxies and load balancers have TCP connection inactivity timeouts, mentioned in the Heartbeats guide). They often range from 30s to 5m.
While heartbeats and TCP keepalives were not designed to work around unfortunate load balancer or proxy settings, by producing periodic network traffic they happen to do just that.

Other Connection Lifecycle Log Entries

Below entries are not necessarily related to failed socket writes but it's worth explaining them
because they are very useful in troubleshooting connectivity-related issues (and can be
misinterpreted).

If you see just the following in RabbitMQ logs:

closing AMQP connection <0.13219.456> (153.x.x.x:56468 -> 185.x.x.x:5672)

(without any mentions or heartbeats, unexpectedly closed TCP connection, connection errors, or any timed out socket writes on RabbitMQ's end), it means that a client connection was cleanly and successfully closed, and it was the application that initiated it.

This guy

{writer,send_failed,{error,timeout}}

means that RabbitMQ attempted to write to a socket but that operation timed out. If you see this
at roughly the same time as a client write failure, it means that a connection went bad but
heartbeats or TCP keepalives were not enabled and did not detect it quicker.

Other Possible Reasons? TCP Connections Can Fail.

In most other cases, a failed socket write is just that, a failed socket write. Network connections can fail or degrade. This client or RabbitMQ cannot avoid that. This is exactly why messaging protocols such as AMQP 0-9-1, STOMP, MQTT have introduced heartbeats, which in this client doesn't serve its purpose very well.

An Alternative with Unfortunate Up Defaults: TCP Keepalives

TCP keepalives can be (and were meant to be, if it wasn't for Linux defaults that were great in 1990s but not any more) used as an alternative.
They do not guarantee that your connection will never fail — the goal is still to detect such connections quicker — and thus you will still likely see failed socket writes.

Connection Recovery

Automatic connection recovery is a feature several RabbitMQ clients have supported for years, e.g. Java client and Bunny. This is considered to be a solved problem by Team RabbitMQ,
at least as far as the general approach and the order of recovery steps go when recovering
the topology. Consider following the steps outlined in the docs of those clients instead of
reinventing your own. Perhaps automatic connection recovery is something this client should do,
or at least explicitly state that it doesn't by design and make it easy to do so in the application code,
which is the path chosen by the Go client author.

Hopefully this explains what may be going on here and why this issue cannot be once and for all addressed by this client, although a functioning concurrent heartbeat implementation would help a lot, as it does in other clients.

@mayankmike86
Copy link

mayankmike86 commented Nov 9, 2020

Same issue for me all of a sudden its causing server restarts..Although I have guarded with .on(error.. check at all places as per the documentation.

NOTE: Is it right to throw err on error event, as in documentation I can see it is just always logged and not thrown back to callee method...is this I am doing wrong?

 broker.on('error', (err) => {
            logger.error('Error occured during broker creation..');
            throw err;
        });
2020-11-09T09:48:38.713648848Z 2020-11-09T09:48:38.71 <ERROR> Error occured during broker creation.. (in rabbitmq-client.js:28)
2020-11-09T09:48:38.713717495Z Call Stack:
2020-11-09T09:48:38.713730503Z     at Vhost.<anonymous> (/xxxxxx/xxxxxx/xxxx/src/data_access/rabbitmq-client.js:28:20)
2020-11-09T09:48:38.713752936Z     at Vhost.emit (events.js:311:20)
2020-11-09T09:48:38.713762558Z     at Vhost.EventEmitter.emit (domain.js:482:12)
2020-11-09T09:48:38.713770793Z     at handleConnectionError (/usr/src/app/node_modules/rascal/lib/amqp/Vhost.js:364:10)
2020-11-09T09:48:38.713781776Z     at CallbackModel.<anonymous> (/usr/src/app/node_modules/lodash/lodash.js:10076:25)
2020-11-09T09:48:38.713844703Z     at Object.onceWrapper (events.js:418:26)
2020-11-09T09:48:38.713854975Z     at CallbackModel.emit (events.js:311:20)
2020-11-09T09:48:38.713862712Z     at CallbackModel.EventEmitter.emit (domain.js:482:12)
2020-11-09T09:48:38.713871696Z     at Connection.emit (events.js:311:20)
2020-11-09T09:48:38.713879162Z     at Connection.EventEmitter.emit (domain.js:482:12)
2020-11-09T09:48:38.713887140Z     at Connection.C.onSocketError (/usr/src/app/node_modules/amqplib/lib/connection.js:353:10)
2020-11-09T09:48:38.713895297Z     at TLSSocket.emit (events.js:323:22)
2020-11-09T09:48:38.713904148Z     at TLSSocket.EventEmitter.emit (domain.js:482:12)
2020-11-09T09:48:38.713912097Z     at endReadableNT (_stream_readable.js:1204:12)
2020-11-09T09:48:38.713920059Z     at processTicksAndRejections (internal/process/task_queues.js:84:21)
2020-11-09T09:48:38.715639414Z xxxxxx/xxx/xxx/xx/src/data_access/rabbitmq-client.js:29
2020-11-09T09:48:38.715666633Z             throw err;
2020-11-09T09:48:38.715728936Z             ^
2020-11-09T09:48:38.715737512Z 
2020-11-09T09:48:38.715746423Z Error: Unexpected close
2020-11-09T09:48:38.715753795Z     at succeed (/usr/src/app/node_modules/amqplib/lib/connection.js:272:13)
2020-11-09T09:48:38.715763604Z     at onOpenOk (/usr/src/app/node_modules/amqplib/lib/connection.js:254:5)
2020-11-09T09:48:38.715771744Z     at /usr/src/app/node_modules/amqplib/lib/connection.js:166:32
2020-11-09T09:48:38.715780274Z     at /usr/src/app/node_modules/amqplib/lib/connection.js:160:12
2020-11-09T09:48:38.715787709Z     at TLSSocket.recv (/usr/src/app/node_modules/amqplib/lib/connection.js:499:12)
2020-11-09T09:48:38.715839122Z     at Object.onceWrapper (events.js:417:28)
2020-11-09T09:48:38.715851042Z     at TLSSocket.emit (events.js:311:20)
2020-11-09T09:48:38.715860289Z     at TLSSocket.EventEmitter.emit (domain.js:482:12)
2020-11-09T09:48:38.715868645Z     at emitReadable_ (_stream_readable.js:562:12)
2020-11-09T09:48:38.715886918Z     at processTicksAndRejections (internal/process/task_queues.js:83:21)

Please help!

@cressie176
Copy link
Collaborator

TL;DR

Q1. What to do on unexpected close?
A. Read this and implement your own recovery logic.

Q2. How know which messages were sent and which were not?
A. Use publisher confirms to learn which messages reached the broker. Assume that any unconfirmed messages did not. You will always have to handle the potential of duplicates, since some may have reached the broker, but the acknowledgement may have been lost when the connection closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants