Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

await producer.connect() hangs indefinitely without retrying on receiving an ECONNRESET from kafka #909

Closed
hughlivingstone opened this issue Oct 6, 2020 · 9 comments
Labels

Comments

@hughlivingstone
Copy link

Describe the bug
I have found that my producer is not retrying a broker connection when it received an ECONNRESET error from kafka. The scenario here is that i am trying to connect a producer but the kafka server does a ECONRESET on the TCP connection.

      const kafka = new Kafka({
      clientId: `wait-until-ready-${uuid()}`,
      brokers: [`${KAFKA_HOST}:${KAFKA_PORT}`],
      connectionTimeout: READY_WAIT_TIMEOUT,
      logLevel: logLevel.DEBUG
      });
      const producer = kafka.producer();
      await producer.connect();

This causes the await producer.connect() to hang indefinitely. I tracked the hang to the following place in kafakjs.
kafkajs/src/network/requestQueue/index.js

waitForPendingRequests() {
return new Promise(resolve => {
if (this.pending.length === 0 && this.inflight.size === 0) {
return resolve()
}
this.logger.debug('Waiting for pending requests', {
clientId: this.clientId,
broker: this.broker,
currentInflightRequests: this.inflight.size,
currentPendingQueueSize: this.pending.length,
})
this.once(REQUEST_QUEUE_EMPTY, () => resolve())
})
}

The promise resolve() is wrapped inside a function within the emitter so it is not being executed. I think if there was an event listener listening it would execute.

I changed the code to resolve like the following and the producer kept trying to reconnect as expected.

  waitForPendingRequests() {
    return new Promise(resolve => {
      if (this.pending.length === 0 && this.inflight.size === 0) {
        return resolve()
      }

      this.logger.debug('Waiting for pending requests', {
        clientId: this.clientId,
        broker: this.broker,
        currentInflightRequests: this.inflight.size,
        currentPendingQueueSize: this.pending.length,
      })

      this.once(REQUEST_QUEUE_EMPTY, () => resolve())
      this.logger.debug('waitForPendingRequests end')
      resolve()
    })
  }

In 1.12.0 the producer kept trying to connect as I expected.

To Reproduce

This happened in my CI environment where kafka responded with the ECONNRESET error. However I was able to reproduce a similar situation by starting up a node server with the following code to mimic a kafka broker resetting the connection. I then tried to connect the producer. The code below causes the server to reset the TCP connection.

    var net = require("net");

    var server = net.createServer(function(socket) {
      logger.info("Server got message");
      // socket.pipe(socket);
      setTimeout(() => socket.destroy(), 0);
    });
    await server.listen(9092, "127.0.0.1");

Expected behavior
The producer should continue to try and reconnect instead of hanging on the await producer.connect() call like it does in 1.12.0

Observed behavior
The producer stopped trying to reconnect as soon as it received the ECONNRESET error from kafka. Prior to that it was trying to reconnect.

    {"level":"DEBUG","timestamp":"2020-10-02T15:10:53.788Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","ssl":false,"sasl":false}
  console.log node_modules/kafkajs/src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2020-10-02T15:10:53.789Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","correlationId":9,"expectResponse":true,"size":67}
  console.error node_modules/kafkajs/src/loggers/console.js:15
    {"level":"ERROR","timestamp":"2020-10-02T15:10:53.792Z","logger":"kafkajs","message":"[Connection] Connection error: read ECONNRESET","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","stack":"Error: read ECONNRESET\n    at TCP.onStreamRead (internal/stream_base_commons.js:183:27)"}
  console.log node_modules/kafkajs/src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2020-10-02T15:10:53.792Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d"}
  console.log node_modules/kafkajs/src/loggers/console.js:19

Environment:

  • OS: Mac and linux alpine on CI env
  • KafkaJS version [1.14.0]
  • Kafka version [2.3.1]
  • NodeJS version [v12.16.1]

Additional context
Add any other context about the problem here.

@Nevon Nevon added the bug label Oct 6, 2020
@ankon
Copy link
Contributor

ankon commented Oct 7, 2020

I changed the code to resolve like the following and the producer kept trying to reconnect as expected.

But now you might call resolve() twice: Once immediately, with still requests pending -- and then later again when the request queue is actually empty (which is what the .once(...) is supposed to do).

Given that you debugged this though, it would be nice to see the full stack trace here, and I would really be interested in seeing which requests are inflight/pending. From reading through the connection logic again an error in connecting (or a timeout later) should lead to a call of connection.disconnect() which does the waiting, but we're only calling rejectRequests after waiting.

diff --git a/src/network/connection.js b/src/network/connection.js
index eed871e..c1394ee 100644
--- a/src/network/connection.js
+++ b/src/network/connection.js
@@ -155,8 +155,8 @@ module.exports = class Connection {
         })
 
         this.logError(error.message, { stack: e.stack })
-        await this.disconnect()
         this.rejectRequests(error)
+        await this.disconnect()
 
         reject(error)
       }
@@ -167,8 +167,8 @@ module.exports = class Connection {
         })
 
         this.logError(error.message)
-        await this.disconnect()
         this.rejectRequests(error)
+        await this.disconnect()
         reject(error)
       }
 

(Untested, pretty sure that we need to touch the connection state and set it to "disconnecting" to avoid new requests getting queued)

@hughlivingstone
Copy link
Author

hughlivingstone commented Oct 7, 2020

I was not suggesting my hack was a fix :) I was just testing things out to understand how it works. In regards to inflight requests, I had only called the await producer.connect(). I had not tried to produce any messages. My next line of code after the await was never executed.

In regards to a fulls tack trace, i had the logs on debug but only got what I showed you. I can reproduce it again and put some extra logging in your package if you want?

@tulios
Copy link
Owner

tulios commented Oct 8, 2020

I will look at this now; the solution is not to immediately resolve since this would release the client before all connections are depleted.

@tulios
Copy link
Owner

tulios commented Oct 8, 2020

@hughlivingstone I think the client is working as expected; it is retrying up to the number configured in the cluster, by default 5 times, and then giving up. I used the server script you provided.

Screenshot 2020-10-08 204319

Here I set the logger to debug mode to see what's happening:

connection-retry

You can see that the client retried the connection 5 times, gradually increasing the retry time between the attempts.

@hughlivingstone
Copy link
Author

hughlivingstone commented Oct 9, 2020

Hmm I am not see the same result for my code:

    const kafka = new Kafka({
      clientId: `wait-until-ready-${uuid()}`,
      brokers: [`${KAFKA_HOST}:${KAFKA_PORT}`],
      connectionTimeout: READY_WAIT_TIMEOUT,
      logLevel: logLevel.DEBUG,
      retry: {
        retries: 5
      }
    });
    const producer = kafka.producer();

    logger.info("Kafka connecting");
    try {
      await producer.connect();
      logger.info("Kafka is now ready for use");
    } catch (e) {
      logger.error(`Kafka not ready - unable to connect within ${READY_WAIT_TIMEOUT} ms`);
      throw e;
    }
  }

This results in the following log with no retries:
Screenshot 2020-10-09 at 15 55 15

I looked at the producer.js example and I can't see any real difference in the configuration. I also tried it without supplying the retry config to the kafka client.

@tulios
Copy link
Owner

tulios commented Oct 12, 2020

@hughlivingstone I can see that this is an integration test, any change the environment is mocking something or changing the behavior of something we use underneath? I've seen some libraries modify the net code or even mock event emitters, etc.

@goriunov
Copy link
Contributor

goriunov commented Oct 13, 2020

I believe it could be related to #918

@ankon
Copy link
Contributor

ankon commented Nov 6, 2020

Given that #944 ended up looking quite like my comment in #909 (comment) ... maybe that issue is now also closed?

@Nevon
Copy link
Collaborator

Nevon commented Nov 19, 2020

I believe this was fixed by #944 and #956. You can try it out with 1.15.0-beta.25 and feel free to re-open this issue if it is not solved.

@Nevon Nevon closed this as completed Nov 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants