Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException when using consumer batchHandler #264

Open
afloarea opened this issue Apr 5, 2024 · 0 comments
Open

NullPointerException when using consumer batchHandler #264

afloarea opened this issue Apr 5, 2024 · 0 comments
Labels

Comments

@afloarea
Copy link
Contributor

afloarea commented Apr 5, 2024

Version

4.5.7

Context

I saw there were exceptions thrown when using the kafka consumer with a batch handler.

Do you have a reproducer?

@Testcontainers
@ExtendWith(VertxExtension.class)
class KafkaBatchConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchConsumerTest.class);
    private static final int MESSAGE_COUNT = 3;

    @Container
    private KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @BeforeEach
    void prepare(Vertx vertx, VertxTestContext testContext) {
        Properties config = new Properties();
        config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

        IntStream.range(0, MESSAGE_COUNT)
                .mapToObj(index ->
                        KafkaProducerRecord.<String, String>create("test.topic", null, "Message " + index, 0))
                .map(producer::write)
                .collect(Collectors.collectingAndThen(Collectors.toList(), Future::all))
                .onComplete(testContext.succeedingThenComplete());
    }

    @Test
    void testBatchConsumer(Vertx vertx, VertxTestContext testContext) {

        Properties config = new Properties();
        config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "test.group");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

        consumer.batchHandler(kafkaRecords -> {
            LOG.info("Got {} records", kafkaRecords.size());
            IntStream.range(0, kafkaRecords.size()).mapToObj(kafkaRecords::recordAt).forEach(kafkaRecord -> LOG.info("Got record: {}", kafkaRecord));
            testContext.verify(() -> {
                Assertions.assertEquals(MESSAGE_COUNT, kafkaRecords.size());
                testContext.completeNow();
            });
        });

        consumer.subscribe("test.topic");

    }

}

The test passes and consuming works but in the logs I can see NullPointerExceptions, one for each message:

java.lang.NullPointerException: Cannot invoke "io.vertx.core.Handler.handle(Object)" because the return value of "io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.tracedHandler(io.vertx.core.Context, io.vertx.core.Handler)" is null
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$run$10(KafkaReadStreamImpl.java:240)
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:328)
	at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:166)
	at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:209)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.run(KafkaReadStreamImpl.java:240)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$schedule$8(KafkaReadStreamImpl.java:194)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
	at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Steps to reproduce

Create a kafka consumer with a batch handler and subscribe to a topic.

@afloarea afloarea added the bug label Apr 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

1 participant