Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer.send() does not reconnect to broker when receiving an ETIMEDOUT error #919

Open
caseycrites opened this issue Oct 14, 2020 · 5 comments

Comments

@caseycrites
Copy link

caseycrites commented Oct 14, 2020

Describe the bug
A lambda I'm running will occasionally receive an ETIMEDOUT error when producing messages to Kafka. When this happens, kafkajs disconnects from the broker and never reconnects, but will continue to retry sending the messages.

To Reproduce

Unfortunately I cannot link to sample code or give you a way to reproduce this reliably (maybe someone reading this can guide me toward a way of forcing an ETIMEDOUT error?), but I will explain our setup with as much code as I can. We're not doing anything too strange here, just setting it up and producing messages.

We configure the kafka object with the following:

{
  clientId: "example",
  brokers: ["bootstrapbroker.cloud:9092"],
  enforceRequestTimeout: true,
  requestTimeout: 2000,
  ssl: true,
  sasl: {
    mechanism: "plain",
    username: "username",
    password: "password"
  }
}

and then the producer, nothing wild here:

const producer = kafka.producer({createPartitioner: Partitioners.JavaCompatiblePartitioner});
await producer.connect();
await producer.send({topic: "topic", messages: [...array of messages...]});

Expected behavior

I expect this to disconnect from the broker, try to reconnect to the broker, and if it successfully does that, start retrying the messages.

Observed behavior

tl;dr the timeout error happens, kafkajs disconnects from the broker, messages are retried without reconnecting to the broker.

This code is running in an aws lambda and happily chugs along doing what it should for awhile, then we receive an ETIMEDOUT error while producing. Log looks like the following:

{
	"level": "ERROR",
	"timestamp": "2020-10-11T19:19:51.698Z",
	"logger": "kafkajs",
	"message": "[Connection] Connection error: write ETIMEDOUT",
	"broker": "broker.cloud:9092",
	"clientId": "example",
	"stack": "Error: write ETIMEDOUT\n    at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:92:16)\n    at handleWriteReq (internal/stream_base_commons.js:51:26)\n    at writeGeneric (internal/stream_base_commons.js:143:15)\n    at TLSSocket.Socket._writeGeneric (net.js:786:11)\n    at TLSSocket.Socket._write (net.js:798:8)\n    at doWrite (_stream_writable.js:403:12)\n    at writeOrBuffer (_stream_writable.js:387:5)\n    at TLSSocket.Writable.write (_stream_writable.js:318:11)\n    at Object.sendRequest (/var/task/node_modules/kafkajs/src/network/connection.js:312:27)\n    at SocketRequest.send [as sendRequest] (/var/task/node_modules/kafkajs/src/network/requestQueue/index.js:135:23)"
}

after this, with debug logging turned on, we see the following logs:

  • [Connection] disconnecting...
  • [RequestQueue] Waiting for pending requests
  • [Connection] Kafka server has closed connection
  • [Connection] disconnecting...
  • [Connection] disconnected
  • [Connection] Request Metadata(key: 3, version: 5)
  • [Connection] Request Metadata(key: 3, version: 5)
  • [Connection] Request Metadata(key: 3, version: 5)

(presumably the 3 last lines are one request to each of the 3 brokers) and then the lambda times out.
EDIT: the 3 last lines are actually just 3 retries to 1 broker, and they happen a few seconds apart from each other

Environment:

  • OS: AWS Lambda, nodejs12.x runtime
  • KafkaJS version 1.12.0
  • Kafka version 2.5
  • NodeJS version v12

Additional context
I have dug around in the code quite a bit trying to understand what's going on here and these are my findings, unclear whether they're actually useful, but here they are.

There are retriers at several levels of the stack, I found them in connection.js (although this one seems to be unused?), cluster/index.js, messageProducer.js, producer/index.js and sendMessages.js. The retrier in messageProducer.js is not created with createRetry and includes code to actually reconnect to the broker if it's disconnected (great!), but afaict the retrier in sendMessages.js catches this error first and retries the messages without attempting a reconnect to the cluster. Honestly though, any of the retries further down the stack other than the one in sendMessages.js could be what's catching it, they all use the generic retrier from createRetry.

Also, in my attempts to work around what I saw as the problem, I added a disconnect listener to my code that listens for disconnects and tries to reinitiate the connection, but the DISCONNECT event never seems to fire (based on logging to producer.logger().info() I set up in the listener) when the ETIMEDOUT error happens, so the listener does not get a chance to do its job. I have tests in my codebase to trigger the DISCONNECT event and this code does what I'd expect it to in those tests. Code for the listener follows:

private listenForEvents() {
  if (this._producer) {
    const { DISCONNECT } = this._producer.events;
    this._producer.on(DISCONNECT, async (disconnectEvent: DisconnectEvent) => {
      this._producer.logger().info("Received a disconnect event", disconnectEvent);
      // force a reconnect if we didn't explicitly call disconnect
      if (!this.explicitlyCalledDisconnect) {
        this._producer.logger().info("Disconnect event received without explicit disconnect call, reconnecting...");
        await this.connect();
      }
    });
  }
}
@Nevon
Copy link
Collaborator

Nevon commented Oct 14, 2020

I've tried to reproduce this myself using Toxiproxy to introduce a timeout between the client and the broker, but I haven't been able to get it to actually trigger the error. The branch is repro-919 in case anyone wants to give it a go.

AFAIK, the socket timeout is decided by the operating system, with no real way to override it from within NodeJS. According to sysctl, my timeout should be 75 seconds:

$ sysctl net.inet.tcp | grep keepinit
net.inet.tcp.keepinit: 75000

So I configure my client to talk to Toxiproxy and for Toxiproxy to proxy to the broker. I connect the client and produce once just to see that everything works. Then I introduce the timeout into the proxy, setting it to stop all data but not actually close the connection. If it's stopping keepalive packets, then I would expect an ETIMEDOUT after 75 seconds.

const run = async () => {
await toxiproxy.reset()
let proxy
try {
proxy = await toxiproxy.createProxy({
listen: `0.0.0.0:39094`,
name: 'kafka-proxy',
upstream: `kafka:29094`,
})
} catch (e) {
if (e.message === 'Proxy kafka-proxy already exists') {
proxy = await toxiproxy.get('kafka-proxy')
} else {
throw e
}
}
producer.logger().info('Initialized proxy', {
proxy,
})
await producer.connect()
await sendMessage()
producer.logger().info('Creating network timeout', {
toxic: (
await proxy.addToxic(
new toxiproxyClient.Toxic(proxy, {
type: 'timeout',
attributes: {
timeout: 0,
},
})
)
).toJson(),
})
await sendMessage()
quit('SIGTERM')
}

That's not what I'm seeing though. Instead, it sits for about 10 minutes waiting before it actually times out. And I don't get an ETIMEDOUT error, and the client tries to reconnect several times (looks like nested retriers):

$ time node examples/producer.js
info:  ┏ [Producer] Initialized proxy
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-14T13:36:40.312Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "proxy": {
info:  ┃ [ 4]     "toxiproxy": {
info:  ┃ [ 5]       "host": "http://192.168.0.38:8474"
info:  ┃ [ 6]     },
info:  ┃ [ 7]     "name": "kafka-proxy",
info:  ┃ [ 8]     "listen": "[::]:39094",
info:  ┃ [ 9]     "upstream": "kafka:29094",
info:  ┃ [10]     "enabled": true,
info:  ┃ [11]     "toxics": []
info:  ┃ [12]   }
info:  ┗ [13] }
info:  ┏ Sending 236 messages #0...
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-14T13:36:40.370Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }
info:  ┏ Messages sent #0
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-14T13:36:40.468Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "response": [
info:  ┃ [ 4]     {
info:  ┃ [ 5]       "topicName": "topic-test",
info:  ┃ [ 6]       "partition": 0,
info:  ┃ [ 7]       "errorCode": 0,
info:  ┃ [ 8]       "baseOffset": "5878",
info:  ┃ [ 9]       "logAppendTime": "-1",
info:  ┃ [10]       "logStartOffset": "0"
info:  ┃ [11]     }
info:  ┃ [12]   ],
info:  ┃ [13]   "msgNumber": 236
info:  ┗ [14] }
info:  ┏ [Producer] Creating network timeout
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-14T13:36:40.472Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "toxic": {
info:  ┃ [ 4]     "attributes": {
info:  ┃ [ 5]       "timeout": 0
info:  ┃ [ 6]     },
info:  ┃ [ 7]     "name": "timeout_downstream",
info:  ┃ [ 8]     "stream": "downstream",
info:  ┃ [ 9]     "toxicity": 1,
info:  ┃ [10]     "type": "timeout"
info:  ┃ [11]   }
info:  ┗ [12] }
info:  ┏ Sending 573 messages #1...
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-14T13:36:40.473Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }

// ... Sits here for about 10 minutes

error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:41.268Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:41.269Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 0,
error: ┃ [4]   "retryTime": 335
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:41.275Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:42.608Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:42.608Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 1,
error: ┃ [4]   "retryTime": 542
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:42.613Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:44.155Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:44.155Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 2,
error: ┃ [4]   "retryTime": 964
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:44.160Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:46.120Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:46.121Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 3,
error: ┃ [4]   "retryTime": 1864
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:46.125Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:48.991Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:48.992Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 4,
error: ┃ [4]   "retryTime": 3810
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:48.995Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:53.804Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:53.805Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 5,
error: ┃ [4]   "retryTime": 7784
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:53.812Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "localhost:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:54.810Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:54.811Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 0,
error: ┃ [4]   "retryTime": 249
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:54.815Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:56.064Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:56.065Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 1,
error: ┃ [4]   "retryTime": 404
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:56.068Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:57.470Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:57.471Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 2,
error: ┃ [4]   "retryTime": 748
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:57.475Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:59.227Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:59.227Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 3,
error: ┃ [4]   "retryTime": 1220
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:46:59.232Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:01.449Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:01.450Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 4,
error: ┃ [4]   "retryTime": 2494
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:01.454Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:04.952Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:04.952Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 5,
error: ┃ [4]   "retryTime": 5136
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:04.959Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:05.956Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:05.957Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 0,
error: ┃ [4]   "retryTime": 282
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:05.961Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:07.241Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:07.242Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 1,
error: ┃ [4]   "retryTime": 564
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:07.246Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:08.814Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:08.814Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 2,
error: ┃ [4]   "retryTime": 1092
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:08.819Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:10.917Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:10.918Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 3,
error: ┃ [4]   "retryTime": 2474
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:10.928Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:14.398Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:14.399Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 4,
error: ┃ [4]   "retryTime": 4666
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:14.406Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:20.071Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "192.168.0.38:39094",
error: ┃ [4]   "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:20.071Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "retryCount": 5,
error: ┃ [4]   "retryTime": 10370
error: ┗ [5] }
error: ┏ [example/producer] Connection timeout
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-14T13:47:20.072Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "stack": "KafkaJSNonRetriableError\n  Caused by: KafkaJSNonRetriableError\n  Caused by: KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/Users/tommybrunn/workspace/kafkajs/src/network/connection.js:165:23)\n    at listOnTimeout (internal/timers.js:551:17)\n    at processTimers (internal/timers.js:494:7)"
error: ┗ [4] }
info:  ┏ process.on SIGTERM
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-14T13:47:20.072Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }
node examples/producer.js  0.59s user 0.13s system 0% cpu 10:40.30 total

Another option might be to write a test where you implement a custom socket factory with a socket that you can trigger to emit the timeout event manually.

@caseycrites
Copy link
Author

I'll try to reproduce with toxiproxy and/or a custom socket factory today, since I'd like to figure this out.

I think if I'm not able to reproduce in a timely fashion, I'm going to turn off kafkajs retries altogether and just catch and retry myself.

@caseycrites
Copy link
Author

Based on my testing yesterday, I'm not sure there's a way to work around this without shutting off retries altogether. The code as currently written closes down the socket connection to a single broker, not the entire connection, so even if I implement a custom socket factory and listen for ETIMEDOUT events, I'm not even sure how to best get things reconnected since calling producer.connect would likely bail early since it already has connected brokers, and if I disconnect/connect the producer I'd presumably still need to call produce.send again because the request queue will be drained, in which case I may as well just turn off retries and retry failures myself.

@Nevon
Copy link
Collaborator

Nevon commented Oct 16, 2020

The code as currently written closes down the socket connection to a single broker, not the entire connection, so even if I implement a custom socket factory and listen for ETIMEDOUT events

Can you share a link to what you're referring to? I would have expected this to come via the socket timeout event, which we listen to and throw a KafkaJSConnectionError in response to, but I guess that's not the case.

@vdesabou
Copy link

vdesabou commented Sep 6, 2021

Hello,

I've made an attempt to reproduce the issue using docker, see below:

Test description

Environment:

KafkaJS version 1.15.0
Kafka version 2.8 (Confluent Platform 6.2.0)
NodeJS version from node:lts-alpine image

How to run

Just run the script start-repro-timeout.sh

What the script does

It starts a zookeeper + 3 brokers + control-center

The producer code is very simple.

Config used is:

const kafka = new Kafka({
  clientId: 'my-kafkajs-producer',
  brokers: ['broker1:9092','broker2:9092','broker3:9092'],
  enforceRequestTimeout: true,
  logLevel: logLevel.DEBUG,
  acks:1,
  connectionTimeout: 20000,
})

It allows only one pending request in order to make troubleshooting easier.

I'm blocking IP address (using iptables) corresponding to kafkaJS client container in broker1 container, so KafkaJS producer does not receive any response back from broker1

ip=$(docker inspect -f '{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $(docker ps -aq) | grep client-kafkajs | cut -d " " -f 3)
docker exec -e ip=$ip --privileged --user root broker1 sh -c "iptables -A OUTPUT -p tcp -d $ip -j DROP"

let the test run 5 minutes

sleep 300

Unblocking IP address $ip corresponding to kafkaJS client

docker exec -e ip=$ip --privileged --user root broker1 sh -c "iptables -D OUTPUT -p tcp -d $ip -j DROP"

let the test run 5 minutes

sleep 300

Results

Test with 10 minutes connection error

Traffic is blocked at 11:32:33:

11:32:33 ℹ️ Blocking IP address 172.18.0.6 corresponding to kafkaJS client
11:32:33 ℹ️ Grepping for WARN|ERROR|Metadata|timed out

30 seconds later request timeout:

[[11:33:04.669]] [LOG]   Producer request timed out at 1630927984668 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":17,"createdAt":1630927954687,"sentAt":1630927954687,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}

Then we see a Connection error: read ETIMEDOUT:

[[11:33:29.928]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:33:29.928Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: read ETIMEDOUT\n    at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"}

That seems to trigger a disconnect:

[[11:33:29.929]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:33:29.929Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:33:29.930]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:33:29.930Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}

I do not see a re-connect, but requests are retried:

Retries:

[[11:33:34.935]] [LOG]   Producer request timed out at 1630928014935 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":18,"createdAt":1630927984955,"sentAt":1630927984955,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:34:05.382]] [LOG]   Producer request timed out at 1630928045381 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":19,"createdAt":1630928015401,"sentAt":1630928015401,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:34:36.243]] [LOG]   Producer request timed out at 1630928076243 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":20,"createdAt":1630928046263,"sentAt":1630928046263,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:07.880]] [LOG]   Producer request timed out at 1630928107880 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":21,"createdAt":1630928077900,"sentAt":1630928077900,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:41.097]] [LOG]   Producer request timed out at 1630928141097 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":22,"createdAt":1630928111116,"sentAt":1630928111116,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:41.098]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:35:41.098Z","logger":"kafkajs","message":"[Producer] Request Produce(key: 0, version: 5) timed out","retryCount":0,"retryTime":282}
[[11:35:41.099]] [LOG]   failed to send data KafkaJSRequestTimeoutError: Request Produce(key: 0, version: 5) timed out
[[11:36:11.879]] [LOG]   Producer request timed out at 1630928171879 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":23,"createdAt":1630928141899,"sentAt":1630928141899,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:36:42.178]] [LOG]   Producer request timed out at 1630928202178 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":24,"createdAt":1630928172198,"sentAt":1630928172198,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:37:12.826]] [LOG]   Producer request timed out at 1630928232826 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"createdAt":1630928202845,"sentAt":1630928202845,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:37:43.965]] [LOG]   Producer request timed out at 1630928263965 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":26,"createdAt":1630928233984,"sentAt":1630928233984,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:16.155]] [LOG]   Producer request timed out at 1630928296154 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":27,"createdAt":1630928266176,"sentAt":1630928266176,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:51.223]] [LOG]   Producer request timed out at 1630928331223 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":28,"createdAt":1630928301243,"sentAt":1630928301243,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:51.224]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:38:51.224Z","logger":"kafkajs","message":"[Producer] Request Produce(key: 0, version: 5) timed out","retryCount":0,"retryTime":320}
[[11:38:51.225]] [LOG]   failed to send data KafkaJSRequestTimeoutError: Request Produce(key: 0, version: 5) timed out

etc...

Note: Request metadata happening on broker3, broker1 is seen as ok as expected (because connection issue is only happening from broker1 to kakfaJS client):

[[11:37:46.168]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:37:46.167Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"broker3:9092","clientId":"my-kafkajs-producer","correlationId":17,"expectResponse":true,"size":47}
[[11:37:46.169]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:37:46.169Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"broker3:9092","clientId":"my-kafkajs-producer","correlationId":17,"size":255,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"broker2","port":9092,"rack":null},{"nodeId":3,"host":"broker3","port":9092,"rack":null},{"nodeId":1,"host":"broker1","port":9092,"rack":null}],"clusterId":"_jMoVOJEQiS8ez1Eo1ucpQ","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"kafkajs","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":2,"replicas":[2,3,1],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":1,"leader":3,"replicas":[3,1,2],"isr":[3,1,2],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":2,"leader":1,"replicas":[1,2,3],"isr":[1,2,3],"offlineReplicas":[]}]}]}}

At 11:42:33 the iptables rule is removed:

11:42:33 ℹ️ Unblocking IP address 172.18.0.6 corresponding to kafkaJS client

We see a disconnection, probably because broker disconnected for good the client (due to connections.max.idle.ms which is 10 minutes by default). This time we have Kafka server has closed connection followed by Connecting:

[[11:42:34.388]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.387Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.388]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.388Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.388]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.388Z","logger":"kafkajs","message":"[Connection] Kafka server has closed connection","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.394]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:42:34.394Z","logger":"kafkajs","message":"[Connection] Connection error: write EPIPE","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: write EPIPE\n    at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:94:16)"}
[[11:42:34.855]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.855Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"broker1:9092","clientId":"my-kafkajs-producer","ssl":false,"sasl":false}

Full logs are here

Test with 5 minutes connection error

I re-ran a test with 5 minutes of iptables instead of 10 minutes (to avoid the disconnection from the broker due to connections.max.idle.ms)

Traffic was blocked at 12:50:45:

12:50:45 ℹ️ Blocking IP address 172.20.0.6 corresponding to kafkaJS client

Around 60 seconds later we see disconnection:

[[12:51:44.730]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T12:51:44.730Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: read ETIMEDOUT\n    at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"}
[[12:51:44.730]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:51:44.730Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[12:51:44.731]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:51:44.731Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}

When traffic is back at 12:55:45:

12:55:45 ℹ️ Unblocking IP address 172.20.0.6 corresponding to kafkaJS client

We see a retry right after:

[[12:56:00.874]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:56:00.874Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 5)","broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"expectResponse":true,"size":510055}

After 21 seconds (not sure why??), we see accumulated responses (blocked by iptables)

[[12:56:21.021]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.021Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":15}
[[12:56:21.023]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.023Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":16}
[[12:56:21.026]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.026Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":17}
[[12:56:21.029]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.029Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":18}
[[12:56:21.032]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.032Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":19}
[[12:56:21.034]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.033Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":20}
[[12:56:21.035]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.035Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":21}
[[12:56:21.038]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.038Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":22}
[[12:56:21.040]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.040Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":23}
[[12:56:21.043]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.042Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":24}

Followed by request response:

[[12:56:21.044]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:56:21.044Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 5)","broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"size":55,"data":{"topics":[{"topicName":"kafkajs","partitions":[{"partition":0,"errorCode":0,"baseOffset":"83","logAppendTime":"-1","logStartOffset":"0"}]}],"throttleTime":0}}

So even if there was a disconnection, it seems that kafkaJS is able to send request again when connection is back ?

Full logs are here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants