Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose KafkaJS Event Listeners #10950

Closed
1 task done
jgoldsmith613 opened this issue Jan 25, 2023 · 2 comments
Closed
1 task done

Expose KafkaJS Event Listeners #10950

jgoldsmith613 opened this issue Jan 25, 2023 · 2 comments

Comments

@jgoldsmith613
Copy link

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

There is an error within kafkaJS that when the client has a generation ID error the consumer does not restart or consume anymore messages.

Here is a github issue describing the issue: tulios/kafkajs#1466
And here is a potential PR to fix it: tulios/kafkajs#1474

One of the mitigations to deal with the error until fixed require something like this:
kafkaClient.consumer.on("consumer.crash", (event) => {
if (event.payload.error.name === "KafkaJSNonRetriableError") {

     process.exit(1);  // will initiate a k8s restart

    // ... or do something else like reconnecting and starting run again ... 
}

});

But I don't believe Nest exposes access to the kafka client like that to be able to do this.

Describe the solution you'd like

Add the ability register event listeners on the kafka client.

the cause of this issue KafkaJS, but even if they fix this current issue where there client, this could still be a useful feature.

Teachability, documentation, adoption, migration strategy

Modify nestjs kafka docs to include example of how to add listener

What is the motivation / use case for changing the behavior?

Initially its because of KafkaJS issue, but could be useful elsewhere too.

@jgoldsmith613 jgoldsmith613 added needs triage This issue has not been looked into type: enhancement 🐺 labels Jan 25, 2023
@kamilmysliwiec
Copy link
Member

Would you like to create a PR for this issue?

@micalevisk micalevisk removed the needs triage This issue has not been looked into label Feb 1, 2023
@jgoldsmith613
Copy link
Author

jgoldsmith613 commented Feb 1, 2023

Would you like to create a PR for this issue?

Sorry for the delay here, we figured out a way to get access to the kafka client by doing the following.

app
    .getMicroservices()
    .filter((service: any) => service.server.transportId === Transport.KAFKA)
    .forEach((service: any) => {
      const consumer: kafkajs.Consumer = service.server.consumer

      consumer.on(consumer.events.CRASH, event => {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants