-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Expected Behavior
Add ackMode attribute to @KafkaListener, similar to existing concurrency and autoStartup:
@Service
public class MyListener {
@KafkaListener(topics = "critical", ackMode = "MANUAL")
public void handleCritical(String msg, Acknowledgment ack) {
ack.acknowledge();
}
@KafkaListener(topics = "normal", ackMode = "BATCH")
public void handleNormal(String msg) { }
}
Configuration simplified to:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(...) {
// Single factory with default ack mode
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
API Design
public @interface KafkaListener {
/**
* Override the container factory's ackMode for this listener.
* Valid values: RECORD, BATCH, TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE
* Supports SpEL #{...} and property placeholders ${...}
*/
String ackMode() default "";
}
Current Behavior
Currently, using different acknowledgment modes for different listeners requires creating multiple container factory beans:
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualAckFactory(...) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchAckFactory(...) {
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
}
@Service
public class MyListener {
@KafkaListener(topics = "critical", containerFactory = "manualAckFactory")
public void handleCritical(String msg, Acknowledgment ack) {
ack.acknowledge();
}
@KafkaListener(topics = "normal", containerFactory = "batchAckFactory")
public void handleNormal(String msg) { }
}
This creates unnecessary boilerplate and maintenance overhead.
Context
Our application handles different message types with different reliability requirements:
- Critical messages (orders, payments): MANUAL ack for precise control
- Normal messages (notifications): BATCH ack for performance
- Analytics events: RECORD ack for real-time processing
Currently, we maintain 3 separate container factory beans solely for different ack modes, while all other settings are identical. This creates unnecessary boilerplate and
maintenance overhead.
Alternatives considered:
- Single factory with one ack mode - doesn't meet our requirements
- Current workaround - multiple factory beans (verbose and hard to maintain)
This feature would align with existing @KafkaListener attributes like concurrency and autoStartup, and bring Spring Kafka to parity with Spring AMQP's @RabbitListener which
already supports per-listener ack mode.