Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka client consumer handler() not receive message after exception #178

Open
fangwohung opened this issue Jul 14, 2020 · 2 comments
Open
Labels

Comments

@fangwohung
Copy link

I'm using vertx-kafka-client 3.9.1.
When i meant to send a message ("abc") to consumer, i got an exception in handler function. After that, i tried to send another message ("xyz"), but my handler function didnt receive the message.
Is it a bug ? It should have received a new message despite exception in handler function.

consumer.handler(record -> {
            consumer.rxCommit()
                    .subscribe(() -> logger.debug("Committed successfully!"),
                            throwable -> logger.error("Kafka commitment error", throwable));
            System.out.println(record.value());
            Integer.parseInt(record.value());
});
@fangwohung fangwohung added the bug label Jul 14, 2020
@ngnhatdinh2
Copy link

I got the same issue with the Rxified version.
I was using a Json Deserializer for consumer, when I produce a non-Json value, I got a SerializationException. Even though, I caught the exception, the next values are nowhere to find.

consumer
        .subscribe(kafkaTopic)
        .toObservable()
        .flatMap(someOerartor)
        .doOnError(
            error -> {
               // do some error Handling
              })
        .doOnError(error -> {})
        .subscribe(logger::info, logger::warn);

@aesteve
Copy link
Contributor

aesteve commented Mar 18, 2023

You may be facing the same problem as: #220

You have a "poison pill" in your topic, and in that case (be it with the standard Kafka client or the Vert.x Kafka client) you have head-of-the-line-blocking.

One strategy is, in the error handling function to check the class of the Exception, and if it's a RecordDeserializationException , get the topic, the partition and the offset, and manually commit it. This way, future polling made by the consumer will retrieve more records.

#220 has code samples (not Rx, but this is translatable) to do so.

Hope this helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

3 participants