Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer went to infinite loop when SerializationException throws #220

Open
cyhii opened this issue Jun 1, 2022 · 6 comments
Open

Consumer went to infinite loop when SerializationException throws #220

cyhii opened this issue Jun 1, 2022 · 6 comments
Labels
Milestone

Comments

@cyhii
Copy link

cyhii commented Jun 1, 2022

Questions

I write a Kafka consumer to consume JSON messages, so I use JsonObjectDeserializer, my configuration is:

        val kafkaConfig: Map<String, String> = mapOf(
            "bootstrap.servers" to config.getString("kafka.bootstrap.servers"),
            "key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" to "io.vertx.kafka.client.serialization.JsonObjectDeserializer",
            "group.id" to "my_group",
            "auto.offset.reset" to "latest",
            "enable.auto.commit" to "true",
        )

but sometimes when a Non-JSON message produced in Kafka, this consumer went to infinite loop at the position of that message.

I read the source code and found these lines in io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.java:

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
      if(this.polling.compareAndSet(false, true)){
          this.worker.submit(() -> {
             boolean submitted = false;
             try {
                if (!this.closed.get()) {
                  try {
                    ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
                    if (records != null && records.count() > 0) {
                      submitted = true; // sets false only when the iterator is overwritten
                      this.context.runOnContext(v -> {
                          this.polling.set(false);
                          handler.handle(records);
                      });
                    }
                  } catch (WakeupException ignore) {
                  } catch (Exception e) {
                    if (exceptionHandler != null) {
                      exceptionHandler.handle(e);
                    }
                  }
                }
             } finally {
                 if(!submitted){
                     this.context.runOnContext(v -> {
                         this.polling.set(false);
                         schedule(0);
                     });
                 }
             }
          });
      }
  }

It throws SerializationException when call this.consumer.poll(), and then it calls exceptionHandler in the catch block. The bugged record is not skipped, so in the next time it causes SerializationException again. I think that's the infinite loop.

And, There is no enough messages in the exception, so I cannot do seek or some other actions in the exceptionHandler.

Maybe it should auto-skip the record and write some logs? or re-new a more make-sense Exception so caller can do something in the exceptionHandler?

Need help, thanks.

Version

4.2.6

@cyhii cyhii added the bug label Jun 1, 2022
@vietj
Copy link
Contributor

vietj commented Jun 1, 2022

can you provide a reproducer ?

@vietj vietj added this to the 4.3.2 milestone Jun 1, 2022
@cyhii
Copy link
Author

cyhii commented Jun 2, 2022

can you provide a reproducer ?

Sure, I've made a commit to my demo project to reproduce this issue

Steps:

  1. Start a Kafka server on localhost(follow the quickstart)
  2. Start the application above
  3. Try to send some non-JSON message, it happens.
$ bin/kafka-console-producer.sh --topic my-topic --bootstrap-server 127.0.0.1:9092
>send a plaintext message

Uncomment the code in exceptionHandler to let the log show...

    consumer.exceptionHandler {
      // WARN: uncomment this line below then you will get log storm
      // log.warn("failed to consume message : {}", it.message, it)
    }

It will print tons of logs like this

12:14:16.099 [vert.x-kafka-consumer-thread-0] WARN  com.example.starter.MyKafkaConsumer - failed to consume message : Error deserializing key/value for partition my-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: io.vertx.core.json.DecodeException: Failed to decode:Unrecognized token 'send': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 6]
	at io.vertx.core.json.jackson.DatabindCodec.fromParser(DatabindCodec.java:129)
	at io.vertx.core.json.jackson.DatabindCodec.fromBuffer(DatabindCodec.java:99)
	at io.vertx.core.json.JsonObject.fromBuffer(JsonObject.java:948)
	at io.vertx.core.json.JsonObject.<init>(JsonObject.java:85)
	at io.vertx.core.buffer.impl.BufferImpl.toJsonObject(BufferImpl.java:110)
	at io.vertx.kafka.client.serialization.JsonObjectDeserializer.deserialize(JsonObjectDeserializer.java:39)
	at io.vertx.kafka.client.serialization.JsonObjectDeserializer.deserialize(JsonObjectDeserializer.java:28)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$pollRecords$6(KafkaReadStreamImpl.java:154)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'send': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 6]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2683)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:865)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
	at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4484)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2730)
	at io.vertx.core.json.jackson.DatabindCodec.fromParser(DatabindCodec.java:126)
	... 22 common frames omitted

@vietj
Copy link
Contributor

vietj commented Jun 8, 2022

@ppatierno can you have a look

@ppatierno
Copy link
Member

Not sure something I can take a look right now, @cyhii any chance for a contribution I can review and help with?

@vietj vietj modified the milestones: 4.3.2, 4.3.3 Jul 6, 2022
@vietj vietj modified the milestones: 4.3.3, 4.3.4 Aug 9, 2022
@vietj vietj modified the milestones: 4.3.4, 4.3.5 Oct 1, 2022
@vietj vietj modified the milestones: 4.3.5, 4.4.0 Nov 18, 2022
@vietj vietj modified the milestones: 4.4.0, 4.4.1 Mar 2, 2023
@aesteve
Copy link
Contributor

aesteve commented Mar 15, 2023

What you have here @cyhii is what is called a "Poison Pill" a corrupted (or just invalid) record that is blocking consumption.

This would also happen with the default Kafka consumer.

I think what we are missing here an equivalent of: ErrorHandlingDeserializer from Spring or DeserializationExceptionHandler from Kafka Streams.

Meaning: a class, or callback, or any mechanism we could configure to appropriate behaviour:

  • by default: keep as-is: crash / loop forever
    • but change the exception type to get more information (for instance the records offset so that a user can seek)
  • in addition (later maybe?) provide different ExceptionHandler out of the box like LogAndContinueExceptionHandler from Kafka Streams or the DeadLetterPublishingRecoverer which are the most common strategy

@aesteve
Copy link
Contributor

aesteve commented Mar 18, 2023

Hello again @cyhii .

I was trying to give this issue a try to see how to improve error management and started with a test.

It may take a while before having a more elaborated design (as in Spring) but if you want to workaround the issue and deal with the poison pill, you can do this:
https://github.com/aesteve/vertx-kafka-client/blob/handle-serialization-exceptions/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java#L1378-L1396

So, correcting myself:

but change the exception type to get more information (for instance the records offset so that a user can seek)

This is actually not necessary, only a cast to RecordDeserializationException is required (after an instanceof check).
And this is the way the standard Kafka Consumer works, too.

And, There is no enough messages in the exception, so I cannot do seek or some other actions in the exceptionHandler.

With such a cast, you should be able to do seek where needed 🙂

Still, having more elaborated Exception handlers could be interesting, but with this you'd deal with the poison pill the same way you would with the standard Kafka consumer client.

Hope this helps.

@vietj vietj modified the milestones: 4.4.1, 4.4.2 Mar 31, 2023
@vietj vietj modified the milestones: 4.4.2, 4.4.3 May 12, 2023
@vietj vietj modified the milestones: 4.4.3, 4.4.4-SNAPSHOT, 4.4.4 Jun 7, 2023
@vietj vietj modified the milestones: 4.4.4, 4.4.5 Jun 22, 2023
@vietj vietj removed this from the 4.4.5 milestone Aug 30, 2023
@vietj vietj added this to the 4.4.6 milestone Aug 30, 2023
@vietj vietj modified the milestones: 4.4.6, 4.5.0 Sep 12, 2023
@vietj vietj modified the milestones: 4.5.0, 4.5.1 Nov 15, 2023
@vietj vietj modified the milestones: 4.5.1, 4.5.2 Dec 13, 2023
@vietj vietj modified the milestones: 4.5.2, 4.5.3 Jan 30, 2024
@vietj vietj modified the milestones: 4.5.3, 4.5.4 Feb 6, 2024
@vietj vietj modified the milestones: 4.5.4, 4.5.5 Feb 22, 2024
@vietj vietj modified the milestones: 4.5.5, 4.5.6 Mar 14, 2024
@vietj vietj modified the milestones: 4.5.6, 4.5.7, 4.5.8 Mar 21, 2024
@vietj vietj modified the milestones: 4.5.8, 4.5.9 May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

4 participants