-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue? : 2.8.6
I have 2 listeners (2 groupIds) : service-1 & service-2 listening the same topics : myTopic
When service-1 fails, I expect the message to be pushed in the retry queue : myTopic-service_1_retry-0
Similarly, when service-2 fails, I expect the message to be pushed in the retry queue : myTopic-service_2_retry-0
Here is an example of my listeners.
@Service
@Slf4j
@RequiredArgsConstructor
public class MyListeners {
@RetryableTopic(
attempts = "2",
numPartitions = "9",
kafkaTemplate = "kafkaTemplate1",
backoff = @Backoff(delay = 30000),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
retryTopicSuffix = "-service_1_retry",
listenerContainerFactory = "kafkaListenerContainerFactory1",
dltTopicSuffix = "-service_1_dlt")
@KafkaListener(
containerFactory = "kafkaListenerContainerFactor1",
groupId = "service-1",
concurrency = "9",
topics = "myTopic",
errorHandler = "logExceptionErrorHandler")
public void processMessageService1(ConsumerRecord<String, Event> message) {
throw new IllegalStateException("test");
}
@RetryableTopic(
attempts = "2",
kafkaTemplate = "kafkaTemplate2",
numPartitions = "9",
backoff = @Backoff(delay= 30000),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
retryTopicSuffix = "-service_2_retry",
listenerContainerFactory = "kafkaListenerContainerFactory2",
dltTopicSuffix = "-service_2_dlt")
@KafkaListener(
containerFactory = "kafkaListenerContainerFactory2",
groupId = "service-2",
concurrency = "9",
topics = "myTopic",
errorHandler = "logExceptionErrorHandler")
public void processMessageService2(ConsumerRecord<String, Event> message) {
return;
}
}
When I push a message in the myTopic
topic and it fails for service-1, the message is being pushed in the myTopic-service_2_retry-0, which is not what I expect.
For more details, here is the KafkaConfig file I use
@EnableKafka
@Configuration
@Slf4j
public class KafkaConfigExample {
@Autowired
private ConsumerFactory<String, Event> consumerFactory;
@PostConstruct
public void updateDefaultConfiguratiion() {
JsonDeserializer<Event> jsonEventDeserializer = new JsonDeserializer<>(Event.class);
jsonEventDeserializer.setRemoveTypeHeaders(false);
jsonEventDeserializer.addTrustedPackages("*");
jsonEventDeserializer.setUseTypeMapperForKey(true);
((DefaultKafkaConsumerFactory) consumerFactory).setValueDeserializer(new ErrorHandlingDeserializer<>(jsonEventDeserializer));
}
@Bean
KafkaListenerErrorHandler logExceptionErrorHandler() {
return (Message<?> msg, ListenerExecutionFailedException ex) -> {
if(ex.getCause().getClass().equals(InputException.class)){
log.warn("Unable to process message: {} with error: {}",msg, ex);
} else {
log.error("Unable to process message: {} with error: {}",msg, ex);
}
throw ex;
};
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate1(ProducerFactory<String, Event> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate2(ProducerFactory<String, Event> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory1(ConsumerFactory<String, Event> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public KafkaTemplate<String, Command> kafkaTemplateCommand(ProducerFactory<String, Command> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory2(ConsumerFactory<String, Event> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}