Skip to content

KafkaMessageListenerContainer tries to send offsets to tx when record is filtered out by early record interceptor #4088

@baurceanu

Description

@baurceanu

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3.7

Describe the bug

KafkaMessageListenerContainer tries to send offsets to a transaction when a record is filtered out by an early record interceptor and fails because there is no tx in progress (the interceptor is called before tx).

o.s.k.l.KafkaMessageListenerContainer 149 - Consumer exception
java.lang.IllegalStateException: Cannot send offsets if a transaction is not in progress (currentState= READY)
	at org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:367)
	at org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:785)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.sendOffsetsToTransaction(DefaultKafkaProducerFactory.java:1169)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doSendOffsets(KafkaMessageListenerContainer.java:3080)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.sendOffsetsToTransaction(KafkaMessageListenerContainer.java:3073)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ackCurrent(KafkaMessageListenerContainer.java:3062)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ackCurrent(KafkaMessageListenerContainer.java:3051)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkEarlyIntercept(KafkaMessageListenerContainer.java:2673)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2517)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2500)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2152)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1528)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1466)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1335)

To Reproduce

  1. Set up a transactional kafka listener
  2. Add an early (interceptBeforeTx is true) filtering RecordInterceptor
  3. Send a record that is not filtered out by the interceptor (this step sets KafkaMessageListenerContainer.ListenerConsumer#producer)
  4. Send a record that is filtered out by the interceptor

Expected behavior

No offsets sending to a transaction.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions