-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.7
Describe the bug
-
When I declare a consumer using
@KafkaListenerand mixtopicsandtopicPartitions, no error occurs. The consumer ends up consuming messages from the topics specified intopicPartitions. I think it would be more user-friendly if a validation exception were thrown in this case. Would it make sense to add such validation? -
When both
topicsandtopicPartitionsare declared in@KafkaListenerand the@RetryableTopicannotation is used, the retry topic is managed based ontopics. However, as explained in point 1, the consumer actually receives messages from the topics intopicPartitions. As a result, if an error occurs while processing messages from topicPartitions, the retry mechanism does not work correctly.
To Reproduce
- Create a consumer with @KafkaListener that uses both topics and topicPartitions.
- Add the @RetryableTopic annotation.
- Send a message to a topicPartition and trigger an exception in the consume
Expected behavior
Sample
public class MyListener {
@RetryableTopic(attempts = "3", backoff = @Backoff(delay = 1000, multiplier = 2), topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(id = "foo",
topics = "myTopic1",
clientIdPrefix = "myClientId", topicPartitions =
{
@TopicPartition(topic = "topic1", partitions = {"0"}),
@TopicPartition(topic = "topic2", partitions = {"0"})
})
public void listen(String data) {
throw new RuntimeException("a");
}
}2025-11-17T18:32:05.694+09:00 ERROR 40508 --- [ foo-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Failed to determine if this record (topic1-0@9) should be recovered, including in seeks
java.lang.NullPointerException: No destination found for topic: topic1
at java.base/java.util.Objects.requireNonNull(Objects.java:340) ~[na:na]
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.doGetDestinationFor(DefaultDestinationTopicResolver.java:230) ~[spring-kafka-3.3.7.jar:3.3.7]
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.getDestinationTopicSynchronized(DefaultDestinationTopicResolver.java:220) ~[spring-kafka-3.3.7.jar:3.3.7]
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.getDestinationHolderFor(DefaultDestinationTopicResolver.java:214) ~[spring-kafka-3.3.7.jar:3.3.7]
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.resolveDestinationTopic(DefaultDestinationTopicResolver.java:99) ~[spring-kafka-3.3.7.jar:3.3.7]
at org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory.lambda$destinationResolver$11(DeadLetterPublishingRecovererFactory.java:289) ~[spring-kafka-3.3.7.jar:3.3.7]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:504) ~[spring-kafka-3.3.7.jar:3.3.7]
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.3.7.jar:3.3.7]