Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,21 +441,18 @@ public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {

[source,java]
----
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
return new DefaultShareConsumerFactory<>(props);
}

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
// The factory will detect the explicit acknowledgment mode from the consumer factory configuration
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
ShareConsumerFactory<String, String> shareConsumerFactory) {

ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);

return factory;
}
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;

import org.springframework.beans.BeanUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class ShareKafkaListenerContainerFactory<K, V>

private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;

private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);

private boolean autoStartup = true;

private int phase = 0;
Expand Down Expand Up @@ -116,6 +119,15 @@ public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}

/**
* Obtain the factory-level container properties - set properties as needed
* and they will be copied to each listener container instance created by this factory.
* @return the properties.
*/
public ContainerProperties getContainerProperties() {
return this.containerProperties;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -152,7 +164,12 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
// Validate share group configuration
validateShareConfiguration(endpoint);

// Copy factory-level properties to container
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
"messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");

// Determine acknowledgment mode following Spring Kafka's configuration precedence patterns
// Check factory-level properties first, then consumer factory config
boolean explicitAck = determineExplicitAcknowledgment(properties);
properties.setExplicitShareAcknowledgment(explicitAck);

Expand Down Expand Up @@ -180,7 +197,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
* <p>
* Configuration precedence (highest to lowest):
* <ol>
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set)</li>
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set via factory-level properties)</li>
* <li>Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}</li>
* <li>Default: {@code false} (implicit acknowledgment)</li>
* </ol>
Expand All @@ -189,16 +206,23 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
* @throws IllegalArgumentException if an invalid acknowledgment mode is configured
*/
private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) {
// Check Kafka client configuration
// Check factory-level properties first
// If explicitly set to true (non-default), use it with highest precedence
if (this.containerProperties.isExplicitShareAcknowledgment()) {
return true;
}

// Check Kafka client configuration as fallback
Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties()
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);

if (clientAckMode != null) {
ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString());
return mode == ShareAcknowledgementMode.EXPLICIT;
}

// Default to implicit acknowledgment (false)
return containerProperties.isExplicitShareAcknowledgment();
return false;
}

private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
"share-listener-consumer-aware-test",
"share-listener-ack-consumer-aware-test",
"share-listener-mixed-ack-test",
"share-listener-error-handling-test"
"share-listener-error-handling-test",
"share-listener-factory-props-test"
},
brokerProperties = {
"share.coordinator.state.topic.replication.factor=1",
Expand Down Expand Up @@ -197,6 +198,22 @@ void shouldHandleProcessingErrorsCorrectly() throws Exception {
assertThat(ErrorHandlingTestListener.errorCount.get()).isEqualTo(1);
}

@Test
void shouldSupportExplicitAcknowledgmentViaFactoryContainerProperties() throws Exception {
final String topic = "share-listener-factory-props-test";
final String groupId = "share-factory-props-group";
setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId);

// Send test message
kafkaTemplate.send(topic, "factory-test", "factory-props-message");

// Wait for processing
assertThat(FactoryPropsTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(FactoryPropsTestListener.received.get()).isEqualTo("factory-props-message");
assertThat(FactoryPropsTestListener.acknowledgmentReceived.get()).isNotNull();
assertThat(isAcknowledgedInternal(FactoryPropsTestListener.acknowledgmentReceived.get())).isTrue();
}

/**
* Sets the share.auto.offset.reset group config to earliest for the given groupId.
*/
Expand Down Expand Up @@ -261,6 +278,16 @@ public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaList
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
}

@Bean
public ShareKafkaListenerContainerFactory<String, String> factoryPropsShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Configure explicit acknowledgment via factory's container properties
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
return factory;
}

@Bean
public ProducerFactory<String, String> producerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> props = new HashMap<>();
Expand Down Expand Up @@ -305,6 +332,11 @@ public MixedAckTestListener mixedAckTestListener() {
public ErrorHandlingTestListener errorHandlingTestListener() {
return new ErrorHandlingTestListener();
}

@Bean
public FactoryPropsTestListener factoryPropsTestListener() {
return new FactoryPropsTestListener();
}
}

// Test listener classes
Expand Down Expand Up @@ -480,4 +512,25 @@ public void listen(ConsumerRecord<String, String> record, ShareAcknowledgment ac
}
}

static class FactoryPropsTestListener {

static final CountDownLatch latch = new CountDownLatch(1);

static final AtomicReference<String> received = new AtomicReference<>();

static final AtomicReference<ShareAcknowledgment> acknowledgmentReceived = new AtomicReference<>();

@KafkaListener(topics = "share-listener-factory-props-test",
groupId = "share-factory-props-group",
containerFactory = "factoryPropsShareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record, @Nullable ShareAcknowledgment acknowledgment) {
received.set(record.value());
acknowledgmentReceived.set(acknowledgment);
if (acknowledgment != null) {
acknowledgment.acknowledge(); // ACCEPT
}
latch.countDown();
}
}

}