From 5fc8220530211f385d71b089a5ca1220b5c2492b Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 17 Oct 2025 15:01:36 -0400 Subject: [PATCH] Add factory-level container properties support to ShareKafkaListenerContainerFactory Enables configuration of container properties at the factory level, following the established pattern in other Spring Kafka container factories. This provides a more flexible and Spring-friendly way to configure share consumer containers, particularly for settings like explicit acknowledgment mode, without requiring configuration through Kafka client properties. - Add `getContainerProperties()` method to allow configuration at factory level - Copy factory-level properties to each listener container instance during creation - Update acknowledgment mode determination to respect factory-level settings with proper precedence - Add integration test for factory-level explicit acknowledgment configuration - Update documentation with factory-level configuration example Signed-off-by: Soby Chacko --- .../ROOT/pages/kafka/kafka-queues.adoc | 23 ++++---- .../ShareKafkaListenerContainerFactory.java | 30 +++++++++- .../ShareKafkaListenerIntegrationTests.java | 55 ++++++++++++++++++- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index b1ed7084f9..4baf290cdb 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -441,21 +441,18 @@ public ShareConsumerFactory explicitShareConsumerFactory() { [source,java] ---- -@Bean -public ShareConsumerFactory explicitShareConsumerFactory() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); - return new DefaultShareConsumerFactory<>(props); -} - @Bean public ShareKafkaListenerContainerFactory explicitShareKafkaListenerContainerFactory( - ShareConsumerFactory explicitShareConsumerFactory) { - // The factory will detect the explicit acknowledgment mode from the consumer factory configuration - return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory); + ShareConsumerFactory shareConsumerFactory) { + + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + + // Configure acknowledgment mode at container factory level + // true means explicit acknowledgment is required + factory.getContainerProperties().setExplicitShareAcknowledgment(true); + + return factory; } ---- diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index 10a36fdc94..b59300d513 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode; +import org.springframework.beans.BeanUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEventPublisher; @@ -59,6 +60,8 @@ public class ShareKafkaListenerContainerFactory private final ShareConsumerFactory shareConsumerFactory; + private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); + private boolean autoStartup = true; private int phase = 0; @@ -116,6 +119,15 @@ public void setConcurrency(int concurrency) { this.concurrency = concurrency; } + /** + * Obtain the factory-level container properties - set properties as needed + * and they will be copied to each listener container instance created by this factory. + * @return the properties. + */ + public ContainerProperties getContainerProperties() { + return this.containerProperties; + } + @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; @@ -152,7 +164,12 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst // Validate share group configuration validateShareConfiguration(endpoint); + // Copy factory-level properties to container + BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern", + "messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties"); + // Determine acknowledgment mode following Spring Kafka's configuration precedence patterns + // Check factory-level properties first, then consumer factory config boolean explicitAck = determineExplicitAcknowledgment(properties); properties.setExplicitShareAcknowledgment(explicitAck); @@ -180,7 +197,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst *

* Configuration precedence (highest to lowest): *

    - *
  1. Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set)
  2. + *
  3. Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set via factory-level properties)
  4. *
  5. Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}
  6. *
  7. Default: {@code false} (implicit acknowledgment)
  8. *
@@ -189,7 +206,13 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst * @throws IllegalArgumentException if an invalid acknowledgment mode is configured */ private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) { - // Check Kafka client configuration + // Check factory-level properties first + // If explicitly set to true (non-default), use it with highest precedence + if (this.containerProperties.isExplicitShareAcknowledgment()) { + return true; + } + + // Check Kafka client configuration as fallback Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties() .get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG); @@ -197,8 +220,9 @@ private boolean determineExplicitAcknowledgment(ContainerProperties containerPro ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString()); return mode == ShareAcknowledgementMode.EXPLICIT; } + // Default to implicit acknowledgment (false) - return containerProperties.isExplicitShareAcknowledgment(); + return false; } private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java index 13768e1ab9..9334bbd0c1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java @@ -72,7 +72,8 @@ "share-listener-consumer-aware-test", "share-listener-ack-consumer-aware-test", "share-listener-mixed-ack-test", - "share-listener-error-handling-test" + "share-listener-error-handling-test", + "share-listener-factory-props-test" }, brokerProperties = { "share.coordinator.state.topic.replication.factor=1", @@ -197,6 +198,22 @@ void shouldHandleProcessingErrorsCorrectly() throws Exception { assertThat(ErrorHandlingTestListener.errorCount.get()).isEqualTo(1); } + @Test + void shouldSupportExplicitAcknowledgmentViaFactoryContainerProperties() throws Exception { + final String topic = "share-listener-factory-props-test"; + final String groupId = "share-factory-props-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "factory-test", "factory-props-message"); + + // Wait for processing + assertThat(FactoryPropsTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(FactoryPropsTestListener.received.get()).isEqualTo("factory-props-message"); + assertThat(FactoryPropsTestListener.acknowledgmentReceived.get()).isNotNull(); + assertThat(isAcknowledgedInternal(FactoryPropsTestListener.acknowledgmentReceived.get())).isTrue(); + } + /** * Sets the share.auto.offset.reset group config to earliest for the given groupId. */ @@ -261,6 +278,16 @@ public ShareKafkaListenerContainerFactory explicitShareKafkaList return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory); } + @Bean + public ShareKafkaListenerContainerFactory factoryPropsShareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + // Configure explicit acknowledgment via factory's container properties + factory.getContainerProperties().setExplicitShareAcknowledgment(true); + return factory; + } + @Bean public ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map props = new HashMap<>(); @@ -305,6 +332,11 @@ public MixedAckTestListener mixedAckTestListener() { public ErrorHandlingTestListener errorHandlingTestListener() { return new ErrorHandlingTestListener(); } + + @Bean + public FactoryPropsTestListener factoryPropsTestListener() { + return new FactoryPropsTestListener(); + } } // Test listener classes @@ -480,4 +512,25 @@ public void listen(ConsumerRecord record, ShareAcknowledgment ac } } + static class FactoryPropsTestListener { + + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); + + static final AtomicReference acknowledgmentReceived = new AtomicReference<>(); + + @KafkaListener(topics = "share-listener-factory-props-test", + groupId = "share-factory-props-group", + containerFactory = "factoryPropsShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, @Nullable ShareAcknowledgment acknowledgment) { + received.set(record.value()); + acknowledgmentReceived.set(acknowledgment); + if (acknowledgment != null) { + acknowledgment.acknowledge(); // ACCEPT + } + latch.countDown(); + } + } + }