diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index 020ac1a0bf..7f59a727fb 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -201,25 +201,6 @@ public class ShareMessageListener { } ---- -[[share-group-configuration]] -== Share Group Configuration - -Share groups require specific broker configuration to function properly. -For testing with embedded Kafka, use: - -[source,java] ----- -@EmbeddedKafka( - topics = {"my-queue-topic"}, - brokerProperties = { - "unstable.api.versions.enable=true", - "group.coordinator.rebalance.protocols=classic,share", - "share.coordinator.state.topic.replication.factor=1", - "share.coordinator.state.topic.min.isr=1" - } -) ----- - [[share-group-offset-reset]] === Share Group Offset Reset @@ -248,8 +229,277 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws [[share-record-acknowledgment]] == Record Acknowledgment -Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing. -More sophisticated acknowledgment patterns will be added in future versions. +Share consumers support two acknowledgment modes that control how records are acknowledged after processing. + +[[share-implicit-acknowledgment]] +=== Implicit Acknowledgment (Default) + +In implicit mode, records are automatically acknowledged based on processing outcome: + +Successful processing: Records are acknowledged as `ACCEPT` +Processing errors: Records are acknowledged as `REJECT` + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + // Implicit mode is the default (false means implicit acknowledgment) + factory.getContainerProperties().setExplicitShareAcknowledgment(false); + + return factory; +} +---- + +[[share-explicit-acknowledgment]] +=== Explicit Acknowledgment + +In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment. + +There are two ways to configure explicit acknowledgment mode: + +==== Option 1: Using Kafka Client Configuration + +[source,java] +---- +@Bean +public ShareConsumerFactory explicitShareConsumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config + return new DefaultShareConsumerFactory<>(props); +} +---- + +==== Option 2: Using Spring Container Configuration + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory explicitShareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + + // Configure acknowledgment mode at container factory level + // true means explicit acknowledgment is required + factory.getContainerProperties().setExplicitShareAcknowledgment(true); + + return factory; +} +---- + +==== Configuration Precedence + +When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest): + +1. **Container Properties**: `containerProperties.setExplicitShareAcknowledgment(true/false)` +2. **Consumer Config**: `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG` ("implicit" or "explicit") +3. **Default**: `false` (implicit acknowledgment) + +[[share-acknowledgment-types]] +=== Acknowledgment Types + +Share consumers support three acknowledgment types: + + ACCEPT: Record processed successfully, mark as completed + RELEASE: Temporary failure, make record available for redelivery + REJECT: Permanent failure, do not retry + +[[share-acknowledgment-api]] +=== ShareAcknowledgment API + +The `ShareAcknowledgment` interface provides methods for explicit acknowledgment: + +[source,java] +---- +public interface ShareAcknowledgment { + void acknowledge(); + void release(); + void reject(); +} +---- + +[[share-listener-interfaces]] +=== Listener Interfaces + +Share consumers support specialized listener interfaces for different use cases: + +[[share-basic-listener]] +==== Basic Message Listener + +Use the standard MessageListener for simple cases: +[source,java] +---- +@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory") +public void listen(ConsumerRecord record) { + System.out.println("Received: " + record.value()); + // Automatically acknowledged in implicit mode +} +---- + +[[share-acknowledging-listener]] +==== AcknowledgingShareConsumerAwareMessageListener + +This interface provides access to the `ShareConsumer` instance with optional acknowledgment support. +The acknowledgment parameter is nullable and depends on the container's acknowledgment mode: + +===== Implicit Mode Example (acknowledgment is null) + +[source,java] +---- +@KafkaListener( + topics = "my-topic", + containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default +) +public void listen(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, + ShareConsumer consumer) { + + // In implicit mode, acknowledgment is null + System.out.println("Received: " + record.value()); + + // Access consumer metrics if needed + Map metrics = consumer.metrics(); + + // Record is auto-acknowledged as ACCEPT on success, REJECT on error +} +---- + +===== Explicit Mode Example (acknowledgment is non-null) + +[source,java] +---- +@Component +public class ExplicitAckListener { + @KafkaListener( + topics = "my-topic", + containerFactory = "explicitShareKafkaListenerContainerFactory" + ) + public void listen(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, + ShareConsumer consumer) { + + // In explicit mode, acknowledgment is non-null + try { + processRecord(record); + acknowledgment.acknowledge(); // ACCEPT + } + catch (RetryableException e) { + acknowledgment.release(); // Will be redelivered + } + catch (Exception e) { + acknowledgment.reject(); // Permanent failure + } + } + + private void processRecord(ConsumerRecord record) { + // Business logic here + } +} +---- + +[[share-acknowledgment-constraints]] +=== Acknowledgment Constraints + +In explicit acknowledgment mode, the container enforces important constraints: + + Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. + One-time Acknowledgment: Each record can only be acknowledged once. + Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`. + +[WARNING] +In explicit mode, failing to acknowledge records will block further message processing. +Always ensure records are acknowledged in all code paths. + +[[share-acknowledgment-timeout]] +==== Acknowledgment Timeout Detection + +To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection. +When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record. + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + + // Set acknowledgment timeout (default is 30 seconds) + factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30)); + + return factory; +} +---- + +When a record exceeds the timeout, you'll see a warning like: +---- +WARN: Record not acknowledged within timeout (30 seconds). +In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), +or ack.reject() for every record. +---- + +This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments. + +[[share-acknowledgment-examples]] +=== Acknowledgment Examples + +[[share-mixed-acknowledgment-example]] +==== Mixed Acknowledgment Patterns + +[source,java] +---- +@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory") + public void processOrder(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + String orderId = record.key(); + String orderData = record.value(); + try { + if (isValidOrder(orderData)) { + if (processOrder(orderData)) { + acknowledgment.acknowledge(); // Success - ACCEPT + } + else { + acknowledgment.release(); // Temporary failure - retry later + } + } + else { + acknowledgment.reject(); // Invalid order - don't retry + } + } + catch (Exception e) { + // Exception automatically triggers REJECT + throw e; + } +} +---- + +[[share-conditional-acknowledgment-example]] +==== Conditional Acknowledgment + +[source,java] +---- +@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory") +public void validateData(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + ValidationResult result = validator.validate(record.value()); + switch (result.getStatus()) { + case VALID: + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + break; + case INVALID_RETRYABLE: + acknowledgment.acknowledge(AcknowledgeType.RELEASE); + break; + case INVALID_PERMANENT: + acknowledgment.acknowledge(AcknowledgeType.REJECT); + break; + } +} +---- [[share-differences-from-regular-consumers]] == Differences from Regular Consumers @@ -259,8 +509,9 @@ Share consumers differ from regular consumers in several key ways: 1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions 2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns 3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously -4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing +4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types 5. **Different Group Management**: Share groups use different coordinator protocols +6. **No Batch Processing**: Share consumers process records individually, not in batches [[share-limitations-and-considerations]] == Limitations and Considerations @@ -268,7 +519,9 @@ Share consumers differ from regular consumers in several key ways: [[share-current-limitations]] === Current Limitations -* **Early Access**: This feature is in early access and may change in future versions -* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported -* **No Message Converters**: Message converters are not yet supported for share consumers +* **In preview**: This feature is in preview mode and may change in future versions * **Single-Threaded**: Share consumer containers currently run in single-threaded mode +* **No Message Converters**: Message converters are not yet supported for share consumers +* **No Batch Listeners**: Batch processing is not supported with share consumers +* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 6a1095a326..5e016c2366 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -89,6 +89,7 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; import org.springframework.kafka.listener.ContainerGroupSequencer; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.KafkaListenerErrorHandler; @@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka KafkaListenerContainerFactory listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName); + if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory) { + endpoint.setShareConsumer(true); + } + this.registrar.registerEndpoint(endpoint, listenerContainerFactory); } @@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint en if (StringUtils.hasText(kafkaListener.batch())) { endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch())); } + endpoint.setBeanFactory(this.beanFactory); resolveErrorHandler(endpoint, kafkaListener); resolveContentTypeConverter(endpoint, kafkaListener); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 834b18de4b..76b201300a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint private @Nullable Boolean batchListener; + private boolean shareConsumer; + private @Nullable KafkaTemplate replyTemplate; private @Nullable String clientIdPrefix; @@ -291,6 +293,19 @@ public void setBatchListener(boolean batchListener) { this.batchListener = batchListener; } + public void setShareConsumer(boolean shareConsumer) { + this.shareConsumer = shareConsumer; + } + + /** + * Return true if this endpoint is for a share consumer. + * @return true for a share consumer endpoint. + * @since 4.0 + */ + public boolean isShareConsumer() { + return this.shareConsumer; + } + /** * Set the {@link KafkaTemplate} to use to send replies. * @param replyTemplate the template. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 3f08435831..67bfa94aad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -36,6 +36,7 @@ import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -210,7 +211,15 @@ protected MessagingMessageListenerAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { MessagingMessageListenerAdapter listener; - if (isBatchListener()) { + if (isShareConsumer()) { + ShareRecordMessagingMessageListenerAdapter messageListener = new ShareRecordMessagingMessageListenerAdapter<>( + this.bean, this.method, this.errorHandler); + if (messageConverter instanceof RecordMessageConverter recordMessageConverter) { + messageListener.setMessageConverter(recordMessageConverter); + } + listener = messageListener; + } + else if (isBatchListener()) { BatchMessagingMessageListenerAdapter messageListener = new BatchMessagingMessageListenerAdapter<>( this.bean, this.method, this.errorHandler); BatchToRecordAdapter batchToRecordAdapter = getBatchToRecordAdapter(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index fdfa68d6bb..cac3cd2c66 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -20,7 +20,8 @@ import java.util.Collection; import java.util.regex.Pattern; -import org.jspecify.annotations.Nullable; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -49,20 +50,24 @@ * @param the value type * * @author Soby Chacko + * * @since 4.0 */ public class ShareKafkaListenerContainerFactory - implements KafkaListenerContainerFactory>, ApplicationEventPublisherAware, ApplicationContextAware { + implements KafkaListenerContainerFactory>, + ApplicationEventPublisherAware, ApplicationContextAware { private final ShareConsumerFactory shareConsumerFactory; - private @Nullable Boolean autoStartup; + private boolean autoStartup = true; - private @Nullable Integer phase; + private int phase = 0; - private @Nullable ApplicationEventPublisher applicationEventPublisher; + @SuppressWarnings("NullAway.Init") + private ApplicationEventPublisher applicationEventPublisher; - private @Nullable ApplicationContext applicationContext; + @SuppressWarnings("NullAway.Init") + private ApplicationContext applicationContext; /** * Construct an instance with the provided consumer factory. @@ -81,7 +86,7 @@ public void setApplicationContext(ApplicationContext applicationContext) { * Set whether containers created by this factory should auto-start. * @param autoStartup true to auto-start */ - public void setAutoStartup(Boolean autoStartup) { + public void setAutoStartup(boolean autoStartup) { this.autoStartup = autoStartup; } @@ -89,7 +94,7 @@ public void setAutoStartup(Boolean autoStartup) { * Set the phase in which containers created by this factory should start and stop. * @param phase the phase */ - public void setPhase(Integer phase) { + public void setPhase(int phase) { this.phase = phase; } @@ -124,15 +129,59 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { */ protected void initializeContainer(ShareKafkaMessageListenerContainer instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); - Boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup; + boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup; + + // Validate share group configuration + validateShareConfiguration(endpoint); + + // Determine acknowledgment mode following Spring Kafka's configuration precedence patterns + boolean explicitAck = determineExplicitAcknowledgment(properties); + properties.setExplicitShareAcknowledgment(explicitAck); + + instance.setAutoStartup(effectiveAutoStartup); + instance.setPhase(this.phase); + instance.setApplicationContext(this.applicationContext); + instance.setApplicationEventPublisher(this.applicationEventPublisher); + JavaUtils.INSTANCE - .acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup) - .acceptIfNotNull(this.phase, instance::setPhase) - .acceptIfNotNull(this.applicationContext, instance::setApplicationContext) - .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) .acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId) - .acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId) - .acceptIfNotNull(endpoint.getConsumerProperties(), properties::setKafkaConsumerProperties); + .acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId); + } + + /** + * Determine whether explicit acknowledgment is required following Spring Kafka's configuration precedence patterns. + *

+ * Configuration precedence (highest to lowest): + *

    + *
  1. Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set)
  2. + *
  3. Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}
  4. + *
  5. Default: {@code false} (implicit acknowledgment)
  6. + *
+ * @param containerProperties the container properties to check + * @return true if explicit acknowledgment is required, false for implicit + * @throws IllegalArgumentException if an invalid acknowledgment mode is configured + */ + private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) { + // Check Kafka client configuration + Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties() + .get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG); + + if (clientAckMode != null) { + ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString()); + return mode == ShareAcknowledgementMode.EXPLICIT; + } + // Default to implicit acknowledgment (false) + return containerProperties.isExplicitShareAcknowledgment(); + } + + private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) { + // Validate that batch listeners aren't used with share consumers + if (Boolean.TRUE.equals(endpoint.getBatchListener())) { + throw new IllegalArgumentException( + "Batch listeners are not supported with share consumers. " + + "Share groups operate at the record level."); + } + } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java new file mode 100644 index 0000000000..8f34a68e74 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java @@ -0,0 +1,69 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.jspecify.annotations.Nullable; + +import org.springframework.kafka.support.ShareAcknowledgment; + +/** + * A message listener for share consumer containers with acknowledgment support. + *

+ * This interface provides access to both the {@link ShareConsumer} instance and acknowledgment + * capabilities. The acknowledgment parameter behavior depends on the container's + * acknowledgment mode: + *

    + *
  • Explicit mode: The acknowledgment parameter is non-null and must + * be used to acknowledge each record
  • + *
  • Implicit mode: The acknowledgment parameter is null and records + * are automatically acknowledged
  • + *
+ *

+ * This is the primary listener interface for share consumers when you need access + * to the ShareConsumer instance or need explicit acknowledgment control. + * + * @param the key type + * @param the value type + * + * @author Soby Chacko + * + * @since 4.0 + * + * @see ShareAcknowledgment + * @see ShareConsumer + */ +@FunctionalInterface +public interface AcknowledgingShareConsumerAwareMessageListener extends GenericMessageListener> { + + /** + * Invoked with data from kafka, an acknowledgment, and provides access to the consumer. + * When explicit acknowledgment mode is used, the acknowledgment parameter will be non-null + * and must be used to acknowledge the record. When implicit acknowledgment mode is used, + * the acknowledgment parameter will be null. + * @param data the data to be processed. + * @param acknowledgment the acknowledgment (nullable in implicit mode). + * @param consumer the consumer. + */ + void onShareRecord(ConsumerRecord data, @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer); + + @Override + default void onMessage(ConsumerRecord data) { + throw new UnsupportedOperationException("Container should never call this"); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 201ab2298a..8135517f66 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -35,6 +35,7 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.AopUtils; import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; @@ -313,6 +314,10 @@ public enum EOSMode { private boolean recordObservationsInBatch; + private boolean explicitShareAcknowledgment = false; + + private Duration shareAcknowledgmentTimeout = Duration.ofSeconds(30); // Align with Kafka's share.record.lock.duration.ms default + /** * Create properties for a container that will subscribe to the specified topics. * @param topics the topics. @@ -1115,6 +1120,57 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) { this.recordObservationsInBatch = recordObservationsInBatch; } + /** + * Set whether explicit acknowledgment is required for share consumer containers. + *

+ * This setting only applies to share consumer containers and is ignored + * by regular consumer containers. + *

+ * When set to {@code false} (default), records are automatically acknowledged + * as ACCEPT when the next poll occurs or when commitSync/commitAsync is called. + *

+ * When set to {@code true}, the application must explicitly acknowledge each + * record using the provided {@link ShareAcknowledgment}. + * @param explicitShareAcknowledgment true for explicit acknowledgment, false for implicit + * @since 4.0 + * @see ShareAcknowledgment + */ + public void setExplicitShareAcknowledgment(boolean explicitShareAcknowledgment) { + this.explicitShareAcknowledgment = explicitShareAcknowledgment; + } + + /** + * Check whether explicit acknowledgment is required for share consumer containers. + * @return true if explicit acknowledgment is required, false for implicit acknowledgment + */ + public boolean isExplicitShareAcknowledgment() { + return this.explicitShareAcknowledgment; + } + + /** + * Set the timeout for share acknowledgments in explicit mode. + *

+ * When a record is not acknowledged within this timeout, a warning + * will be logged to help identify missing acknowledgment calls. + * This only applies when using explicit acknowledgment mode. + *

+ * Default is 30 seconds. + * @param shareAcknowledgmentTimeout the timeout duration + * @since 4.0 + */ + public void setShareAcknowledgmentTimeout(Duration shareAcknowledgmentTimeout) { + this.shareAcknowledgmentTimeout = shareAcknowledgmentTimeout; + } + + /** + * Get the timeout for share acknowledgments in explicit mode. + * @return the acknowledgment timeout + * @since 4.0 + */ + public Duration getShareAcknowledgmentTimeout() { + return this.shareAcknowledgmentTimeout; + } + @Override public String toString() { StringBuilder sb = new StringBuilder("ContainerProperties ["); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index f94c78c099..e4216a102a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -20,10 +20,14 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -31,25 +35,48 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; +import org.springframework.core.log.LogMessage; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.core.ShareConsumerFactory; import org.springframework.kafka.event.ConsumerStartedEvent; import org.springframework.kafka.event.ConsumerStartingEvent; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.util.Assert; /** - * {@code ShareKafkaMessageListenerContainer} is a message listener container for Kafka's share consumer model. + * Single-threaded share consumer container using the Java {@link ShareConsumer}. *

- * This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}. - * It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop - * with per-record dispatch and acknowledgement. + * This container provides support for Kafka share groups, enabling cooperative + * consumption where multiple consumers can process records from the same partitions. + * Unlike traditional consumer groups with exclusive partition assignment, share groups + * allow load balancing at the record level. + *

+ * Key features: + *

    + *
  • Explicit and implicit acknowledgment modes
  • + *
  • Automatic error handling with REJECT acknowledgments
  • + *
  • Poll-level acknowledgment constraints in explicit mode
  • + *
  • Integration with Spring's {@code @KafkaListener} annotation
  • + *
+ *

+ * Acknowledgment Modes: + *

    + *
  • Implicit: Records are automatically acknowledged as ACCEPT + * after successful processing or REJECT on errors
  • + *
  • Explicit: Application must manually acknowledge each record; + * subsequent polls are blocked until all records from the previous poll are acknowledged
  • + *
* * @param the key type * @param the value type * * @author Soby Chacko + * * @since 4.0 + * + * @see ShareConsumer + * @see ShareAcknowledgment */ public class ShareKafkaMessageListenerContainer extends AbstractShareKafkaMessageListenerContainer { @@ -124,6 +151,16 @@ protected void doStart() { } GenericMessageListener listener = (GenericMessageListener) messageListener; Assert.state(listener != null, "'messageListener' cannot be null"); + + // Validate listener type for explicit acknowledgment mode + if (containerProperties.isExplicitShareAcknowledgment()) { + boolean isAcknowledgingListener = listener instanceof AcknowledgingShareConsumerAwareMessageListener; + Assert.state(isAcknowledgingListener, + "Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener. " + + "Current listener type: " + listener.getClass().getName() + ". " + + "Either use implicit acknowledgment mode or provide a listener that can handle acknowledgments."); + } + this.listenerConsumer = new ShareListenerConsumer(listener); setRunning(true); this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor); @@ -150,6 +187,21 @@ private void publishConsumerStartedEvent() { } } + /** + * Represents a pending acknowledgment to be processed on the consumer thread. + */ + private static class PendingAcknowledgment { + + private final ConsumerRecord record; + + private final AcknowledgeType type; + + PendingAcknowledgment(ConsumerRecord record, AcknowledgeType type) { + this.record = record; + this.type = type; + } + } + /** * The inner share consumer thread that polls for records and dispatches to the listener. */ @@ -165,6 +217,22 @@ private class ShareListenerConsumer implements Runnable { private final @Nullable String clientId; + // Acknowledgment tracking for explicit mode + private final Map, ShareConsumerAcknowledgment> pendingAcknowledgments = new ConcurrentHashMap<>(); + + // Tracking for missed acknowledgment detection + private final Map, Long> acknowledgmentTimestamps = new ConcurrentHashMap<>(); + + // Lock for coordinating acknowledgment completion + private final Object acknowledgmentLock = new Object(); + + // Queue for acknowledgments from other threads + private final ConcurrentLinkedQueue> acknowledgmentQueue = new ConcurrentLinkedQueue<>(); + + private final boolean isExplicitMode; + + private final long ackTimeoutMs; + ShareListenerConsumer(GenericMessageListener listener) { this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( ShareKafkaMessageListenerContainer.this.getGroupId(), @@ -173,6 +241,18 @@ private class ShareListenerConsumer implements Runnable { this.genericListener = listener; this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); ContainerProperties containerProperties = getContainerProperties(); + + // Configure acknowledgment mode + this.isExplicitMode = containerProperties.isExplicitShareAcknowledgment(); + this.ackTimeoutMs = containerProperties.getShareAcknowledgmentTimeout().toMillis(); + + // Configure consumer properties based on acknowledgment mode + if (this.isExplicitMode) { + // Apply explicit mode configuration to consumer + // Note: This should ideally be done during consumer creation in the factory + this.logger.info("Share consumer configured for explicit acknowledgment mode"); + } + this.consumer.subscribe(Arrays.asList(containerProperties.getTopics())); } @@ -188,22 +268,38 @@ public void run() { Throwable exitThrowable = null; while (isRunning()) { try { - var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT)); - if (records != null && records.count() > 0) { - for (var record : records) { - if (this.genericListener instanceof AcknowledgingConsumerAwareMessageListener ackListener) { - ackListener.onMessage(record, null, null); + // Process any pending acknowledgments from other threads + processQueuedAcknowledgments(); + + // In explicit mode, check for acknowledgment timeouts before polling + if (this.isExplicitMode && !this.pendingAcknowledgments.isEmpty()) { + checkAcknowledgmentTimeouts(); + } + + ConsumerRecords records; + try { + records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT)); + } + catch (IllegalStateException e) { + // KIP-932: In explicit mode, poll() throws if unacknowledged records exist + if (this.isExplicitMode && !this.pendingAcknowledgments.isEmpty()) { + this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() + + " acknowledgments"); + // Small delay to prevent tight loop while maintaining reasonable responsiveness + try { + Thread.sleep(10); } - else { - GenericMessageListener> listener = - (GenericMessageListener>) this.genericListener; - listener.onMessage(record); + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; } - // Temporarily auto-acknowledge and commit. - // We will refactor it later on to support more production-like scenarios. - this.consumer.acknowledge(record, AcknowledgeType.ACCEPT); - this.consumer.commitSync(); + continue; } + throw e; // Re-throw if not related to acknowledgments + } + + if (records != null && records.count() > 0) { + processRecords(records); } } catch (Error e) { @@ -226,6 +322,139 @@ public void run() { wrapUp(); } + private void processRecords(ConsumerRecords records) { + for (var record : records) { + ShareConsumerAcknowledgment acknowledgment = null; + + try { + if (this.isExplicitMode) { + // Create acknowledgment using inner class + acknowledgment = new ShareConsumerAcknowledgment(record); + this.pendingAcknowledgments.put(record, acknowledgment); + // Track when the record was dispatched for acknowledgment timeout detection + this.acknowledgmentTimestamps.put(record, System.currentTimeMillis()); + } + + // Dispatch to listener + if (this.genericListener instanceof AcknowledgingShareConsumerAwareMessageListener ackListener) { + @SuppressWarnings("unchecked") + AcknowledgingShareConsumerAwareMessageListener typedAckListener = + (AcknowledgingShareConsumerAwareMessageListener) ackListener; + typedAckListener.onShareRecord(record, acknowledgment, this.consumer); + } + else { + // Basic listener remains the same + @SuppressWarnings("unchecked") + GenericMessageListener> listener = + (GenericMessageListener>) this.genericListener; + listener.onMessage(record); + } + } + catch (Exception e) { + handleProcessingError(record, acknowledgment, e); + } + } + // Commit acknowledgments + commitAcknowledgments(); + } + + private void handleProcessingError(ConsumerRecord record, + @Nullable ShareConsumerAcknowledgment acknowledgment, Exception e) { + this.logger.error(e, "Error processing record: " + record); + + if (this.isExplicitMode && acknowledgment != null) { + // Remove from pending and auto-reject on error + this.pendingAcknowledgments.remove(record); + try { + acknowledgment.reject(); + } + catch (Exception ackEx) { + this.logger.error(ackEx, "Failed to reject record after processing error"); + } + } + else { + // In implicit mode, auto-reject on error + try { + this.consumer.acknowledge(record, AcknowledgeType.REJECT); + } + catch (Exception ackEx) { + this.logger.error(ackEx, "Failed to reject record after processing error"); + } + } + } + + private void commitAcknowledgments() { + try { + this.consumer.commitSync(); + } + catch (Exception e) { + this.logger.error(e, "Failed to commit acknowledgments"); + } + } + + /** + * Called by ShareConsumerAcknowledgment when a record is acknowledged in explicit mode. + * + * @param record the record that was acknowledged + */ + void onRecordAcknowledged(ConsumerRecord record) { + if (this.isExplicitMode) { + this.pendingAcknowledgments.remove(record); + this.acknowledgmentTimestamps.remove(record); + this.logger.debug(() -> "Record acknowledged, " + this.pendingAcknowledgments.size() + " still pending"); + } + } + + /** + * Process acknowledgments queued from other threads. + * This ensures all consumer access happens on the consumer thread. + */ + private void processQueuedAcknowledgments() { + PendingAcknowledgment pendingAck; + while ((pendingAck = this.acknowledgmentQueue.poll()) != null) { + final PendingAcknowledgment ack = pendingAck; + try { + this.consumer.acknowledge(ack.record, ack.type); + // Find and notify the acknowledgment object + ShareConsumerAcknowledgment acknowledgment = this.pendingAcknowledgments.get(ack.record); + if (acknowledgment != null) { + acknowledgment.notifyAcknowledged(ack.type); + onRecordAcknowledged(ack.record); + } + } + catch (Exception e) { + this.logger.error(e, () -> "Failed to process queued acknowledgment for record: " + ack.record); + } + } + } + + /** + * Check if any records have exceeded the acknowledgment timeout and log a warning. + */ + private void checkAcknowledgmentTimeouts() { + if (!this.isExplicitMode || this.acknowledgmentTimestamps.isEmpty()) { + return; + } + + long currentTime = System.currentTimeMillis(); + for (Map.Entry, Long> entry : this.acknowledgmentTimestamps.entrySet()) { + long recordAge = currentTime - entry.getValue(); + if (recordAge > this.ackTimeoutMs) { + ConsumerRecord record = entry.getKey(); + this.logger.warn(LogMessage.format( + "Record not acknowledged within timeout (%d seconds). " + + "In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), " + + "or ack.reject() for every record. " + + "Unacknowledged record: topic='%s', partition=%d, offset=%d", + this.ackTimeoutMs / 1000, + record.topic(), record.partition(), record.offset() + )); + // Only warn once per record + this.acknowledgmentTimestamps.put(record, currentTime); + } + } + } + protected void initialize() { publishConsumerStartingEvent(); publishConsumerStartedEvent(); @@ -244,6 +473,79 @@ public String toString() { + "]"; } + /** + * Inner acknowledgment class that integrates directly with the container. + */ + private class ShareConsumerAcknowledgment implements ShareAcknowledgment { + + private final ConsumerRecord record; + + private final AtomicReference acknowledgmentType = new AtomicReference<>(); + + ShareConsumerAcknowledgment(ConsumerRecord record) { + this.record = record; + } + + @Override + public void acknowledge() { + acknowledgeInternal(AcknowledgeType.ACCEPT); + } + + @Override + public void release() { + acknowledgeInternal(AcknowledgeType.RELEASE); + } + + @Override + public void reject() { + acknowledgeInternal(AcknowledgeType.REJECT); + } + + private void acknowledgeInternal(AcknowledgeType type) { + if (!this.acknowledgmentType.compareAndSet(null, type)) { + throw new IllegalStateException( + String.format("Record at offset %d has already been acknowledged with type %s", + this.record.offset(), this.acknowledgmentType.get())); + } + + // Queue the acknowledgment to be processed on the consumer thread + ShareKafkaMessageListenerContainer.this.listenerConsumer.acknowledgmentQueue.offer( + new PendingAcknowledgment<>(this.record, type)); + } + + /** + * Called by the consumer thread after successful acknowledgment. + * @param type the type of acknowledgment + */ + void notifyAcknowledged(AcknowledgeType type) { + this.acknowledgmentType.set(type); + } + + boolean isAcknowledged() { + return this.acknowledgmentType.get() != null; + } + + @Nullable + AcknowledgeType getAcknowledgmentType() { + return this.acknowledgmentType.get(); + } + + ConsumerRecord getRecord() { + return this.record; + } + + @Override + public String toString() { + return "ShareConsumerAcknowledgment{" + + "topic=" + this.record.topic() + + ", partition=" + this.record.partition() + + ", offset=" + this.record.offset() + + ", acknowledged=" + isAcknowledged() + + ", type=" + getAcknowledgmentType() + + '}'; + } + } + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 386503f321..2e597d01b1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.common.TopicPartition; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -61,6 +62,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.KafkaUtils; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.Message; @@ -407,13 +409,13 @@ public void onIdleContainer(Map assignments, ConsumerSeekC } } - protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable Acknowledgment acknowledgment, - @Nullable Consumer consumer) { + protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable Object acknowledgment, + @Nullable Object consumer) { return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType()); } - protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, + protected void invoke(Object records, @Nullable Object acknowledgment, @Nullable Object consumer, final Message message) { Throwable listenerError = null; @@ -421,14 +423,22 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, @ Observation currentObservation = getCurrentObservation(); try { result = invokeHandler(records, acknowledgment, message, consumer); - if (result != null) { - handleResult(result, records, acknowledgment, consumer, message); + // For share consumers, we don't handle results yet (TODO: Handle results with queues) + // For regular consumers, handle results regardless of acknowledgment type/null status + if (result != null && !(consumer instanceof ShareConsumer)) { + handleResult(result, records, (Acknowledgment) acknowledgment, (Consumer) consumer, message); } } catch (ListenerExecutionFailedException e) { listenerError = e; currentObservation.error(e.getCause() != null ? e.getCause() : e); - handleException(records, acknowledgment, consumer, message, e); + // For share consumers, throw the error back to the container + if (consumer instanceof ShareConsumer) { + throw e; + } + else { + handleException(records, (Acknowledgment) acknowledgment, (Consumer) consumer, message, e); + } } catch (Error e) { listenerError = e; @@ -453,15 +463,16 @@ private Observation getCurrentObservation() { * @param data the data to process during invocation. * @param acknowledgment the acknowledgment to use if any. * @param message the message to process. - * @param consumer the consumer. + * @param consumer the consumer (can be Consumer or ShareConsumer). * @return the result of invocation. */ @Nullable - protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message message, - @Nullable Consumer consumer) { + protected final Object invokeHandler(Object data, @Nullable Object acknowledgment, Message message, + @Nullable Object consumer) { - Acknowledgment ack = acknowledgment; - if (ack == null && this.noOpAck) { + Object ack = acknowledgment; + // For regular Acknowledgment, check if we need to use NO_OP_ACK + if (ack == null && this.noOpAck && !(consumer instanceof ShareConsumer)) { ack = NO_OP_ACK; } Assert.notNull(this.handlerMethod, "the 'handlerMethod' must not be null"); @@ -478,10 +489,20 @@ else if (this.hasMetadataParameter) { } } catch (MessageConversionException ex) { - throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex)); + if (ack instanceof ShareAcknowledgment) { + throw checkAckArg((ShareAcknowledgment) ack, message, new MessageConversionException("Cannot handle message", ex)); + } + else { + throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex)); + } } catch (MethodArgumentNotValidException ex) { - throw checkAckArg(ack, message, ex); + if (ack instanceof ShareAcknowledgment) { + throw checkAckArg((ShareAcknowledgment) ack, message, ex); + } + else { + throw checkAckArg(ack, message, ex); + } } catch (MessagingException ex) { throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " + @@ -493,11 +514,21 @@ else if (this.hasMetadataParameter) { } } - private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Message message, Exception ex) { + private RuntimeException checkAckArg(@Nullable Object acknowledgment, Message message, Exception ex) { + if (this.hasAckParameter && acknowledgment == null) { + return new ListenerExecutionFailedException("invokeHandler Failed", + new IllegalStateException("No Acknowledgment available as an argument, " + + "the listener container must have a MANUAL AckMode to populate the Acknowledgment.")); + } + return new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " + + "be invoked with the incoming message", message.getPayload()), ex); + } + + private RuntimeException checkAckArg(@Nullable ShareAcknowledgment acknowledgment, Message message, Exception ex) { if (this.hasAckParameter && acknowledgment == null) { return new ListenerExecutionFailedException("invokeHandler Failed", - new IllegalStateException("No Acknowledgment available as an argument, " - + "the listener container must have a MANUAL AckMode to populate the Acknowledgment.")); + new IllegalStateException("No ShareAcknowledgment available as an argument, " + + "the listener container must have an explicit acknowledgement mode to populate the Acknowledgment.")); } return new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " + "be invoked with the incoming message", message.getPayload()), ex); @@ -847,6 +878,9 @@ protected Type determineInferredType(@Nullable Method method) { // NOSONAR compl Type parameterType = methodParameter.getGenericParameterType(); boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class); boolean isAck = parameterIsType(parameterType, Acknowledgment.class); + if (!isAck) { + isAck = parameterIsType(parameterType, ShareAcknowledgment.class); + } this.hasAckParameter |= isAck; if (isAck) { this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null; @@ -961,4 +995,20 @@ public void acknowledge() { } + static class NoOpShareAck implements ShareAcknowledgment { + + @Override + public void acknowledge() { + } + + @Override + public void release() { + } + + @Override + public void reject() { + } + + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java new file mode 100644 index 0000000000..698d655745 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java @@ -0,0 +1,87 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import java.lang.reflect.Method; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.jspecify.annotations.Nullable; + +import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.AcknowledgingShareConsumerAwareMessageListener; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.ShareAcknowledgment; +import org.springframework.kafka.support.converter.JacksonProjectingMessageConverter; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.messaging.Message; + +/** + * A {@link org.springframework.kafka.listener.MessageListener MessageListener} + * adapter that invokes a configurable {@link HandlerAdapter}; used when the factory is + * configured for the listener to receive individual messages from share groups. + * + *

Wraps the incoming Kafka Message to Spring's {@link Message} abstraction. + * + *

The original {@link ConsumerRecord} and + * the {@link ShareAcknowledgment} are provided as additional arguments so that these can + * be injected as method arguments if necessary. + * + * @param the key type. + * @param the value type. + * @author Soby Chacko + * @since 4.0 + */ +public class ShareRecordMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter + implements AcknowledgingShareConsumerAwareMessageListener { + + public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, + @Nullable KafkaListenerErrorHandler errorHandler) { + super(bean, method, errorHandler); + } + + /** + * Kafka {@link AcknowledgingConsumerAwareMessageListener} entry point. + *

Delegate the message to the target listener method, + * with appropriate conversion of the message argument. + * @param record the incoming Kafka {@link ConsumerRecord}. + * @param acknowledgment the acknowledgment. + * @param consumer the consumer. + */ + @Override + @SuppressWarnings("removal") + public void onShareRecord(ConsumerRecord record, @Nullable ShareAcknowledgment acknowledgment, + @Nullable ShareConsumer consumer) { + + Message message; + if (isConversionNeeded()) { + message = toMessagingMessage(record, acknowledgment, consumer); + } + else { + message = NULL_MESSAGE; + } + if (logger.isDebugEnabled()) { + RecordMessageConverter messageConverter = getMessageConverter(); + if (!(messageConverter instanceof JacksonProjectingMessageConverter + || messageConverter instanceof org.springframework.kafka.support.converter.ProjectingMessageConverter)) { + this.logger.debug("Processing [" + message + "]"); + } + } + invoke(record, acknowledgment, consumer, message); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java new file mode 100644 index 0000000000..79bd210d28 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java @@ -0,0 +1,92 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +/** + * A handle for acknowledging the delivery of a record when using share groups. + *

+ * Share groups enable cooperative consumption where multiple consumers can process + * records from the same partitions. Each record must be explicitly acknowledged + * to indicate the result of processing. + *

+ * Acknowledgment types: + *

    + *
  • {@code ACCEPT} - Record processed successfully
  • + *
  • {@code RELEASE} - Temporary failure, make available for retry
  • + *
  • {@code REJECT} - Permanent failure, do not retry
  • + *
+ *

+ * This interface is only applicable when using explicit acknowledgment mode + * ({@code share.acknowledgement.mode=explicit}). In implicit mode, records are + * automatically acknowledged as {@code ACCEPT}. + *

+ * Note: Acknowledgment is separate from commit operations. After acknowledging + * records, use {@code commitSync()} or {@code commitAsync()} to persist the + * acknowledgments to the broker. + * + * @author Soby Chacko + * @since 4.0 + */ +public interface ShareAcknowledgment { + + /** + * Acknowledge the record as successfully processed. + *

+ * The record will be marked as completed and will not be redelivered. + * The acknowledgment will be committed when: + *

    + *
  • The next {@code poll()} is called (batched with fetch)
  • + *
  • {@code commitSync()} or {@code commitAsync()} is explicitly called
  • + *
  • The consumer is closed
  • + *
+ * + * @throws IllegalStateException if the record has already been acknowledged + * @since 4.0 + */ + void acknowledge(); + + /** + * Release the record for redelivery due to a transient failure. + *

+ * The record will be made available for another delivery attempt. + * The acknowledgment will be committed when: + *

    + *
  • The next {@code poll()} is called (batched with fetch)
  • + *
  • {@code commitSync()} or {@code commitAsync()} is explicitly called
  • + *
  • The consumer is closed
  • + *
+ * + * @throws IllegalStateException if the record has already been acknowledged + */ + void release(); + + /** + * Reject the record due to a permanent failure. + *

+ * The record will not be delivered again and will be archived. + * The acknowledgment will be committed when: + *

    + *
  • The next {@code poll()} is called (batched with fetch)
  • + *
  • {@code commitSync()} or {@code commitAsync()} is explicitly called
  • + *
  • The consumer is closed
  • + *
+ * + * @throws IllegalStateException if the record has already been acknowledged + */ + void reject(); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index 381062cd2c..0e61a338bb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -18,10 +18,8 @@ import java.util.Map; -import org.apache.kafka.clients.consumer.Consumer; import org.jspecify.annotations.Nullable; -import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -46,8 +44,8 @@ static String getGroupId() { /** * Set up the common headers. - * @param acknowledgment the acknowledgment. - * @param consumer the consumer. + * @param acknowledgment the acknowledgment (can be Acknowledgment or ShareAcknowledgment). + * @param consumer the consumer (can be Consumer or ShareConsumer). * @param rawHeaders the raw headers map. * @param theKey the key. * @param topic the topic. @@ -57,7 +55,7 @@ static String getGroupId() { * @param timestamp the timestamp. */ @SuppressWarnings("NullAway") // Dataflow analysis limitation - default void commonHeaders(@Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, Map rawHeaders, + default void commonHeaders(@Nullable Object acknowledgment, @Nullable Object consumer, Map rawHeaders, @Nullable Object theKey, Object topic, Object partition, Object offset, @Nullable Object timestampType, Object timestamp) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index e1c2445cef..2d1572d72f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -23,7 +23,6 @@ import java.util.function.Function; import org.apache.commons.logging.LogFactory; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; @@ -33,7 +32,6 @@ import org.springframework.core.log.LogAccessor; import org.springframework.kafka.support.AbstractKafkaHeaderMapper; -import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.kafka.support.JacksonPresent; import org.springframework.kafka.support.JsonKafkaHeaderMapper; @@ -156,7 +154,7 @@ protected org.springframework.messaging.converter.MessageConverter getMessagingC * IMPORTANT: This converter's {@link #fromMessage(Message, String)} method is called * for outbound conversion to a {@link ProducerRecord} with the message payload in the * {@link ProducerRecord#value()} property. - * {@link #toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)} is called for + * {@link #toMessage(ConsumerRecord, Object, Object, Type)} is called for * inbound conversion from {@link ConsumerRecord} with the payload being the * {@link ConsumerRecord#value()} property. *

@@ -181,7 +179,7 @@ public void setMessagingConverter(@Nullable SmartMessageConverter messagingConve } @Override - public Message toMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, + public Message toMessage(ConsumerRecord record, @Nullable Object acknowledgment, @Nullable Object consumer, @Nullable Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java index fc641570e1..59864a50b7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java @@ -18,13 +18,11 @@ import java.lang.reflect.Type; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; -import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.Message; /** @@ -38,13 +36,13 @@ public interface RecordMessageConverter extends MessageConverter { /** * Convert a {@link ConsumerRecord} to a {@link Message}. * @param record the record. - * @param acknowledgment the acknowledgment. - * @param consumer the consumer + * @param acknowledgment the acknowledgment (can be Acknowledgment or ShareAcknowledgment). + * @param consumer the consumer (can be Consumer or ShareConsumer). * @param payloadType the required payload type. * @return the message. */ @NonNull - Message toMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, + Message toMessage(ConsumerRecord record, @Nullable Object acknowledgment, @Nullable Object consumer, @Nullable Type payloadType); /** diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index aa8f67da07..aeb0cb1858 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1298,8 +1298,8 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, factory.setRecordMessageConverter(new RecordMessageConverter() { @Override - public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, - Consumer consumer, Type payloadType) { + public Message toMessage(ConsumerRecord record, @Nullable Object acknowledgment, + @Nullable Object consumer, @Nullable Type payloadType) { throw new UnsupportedOperationException(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java index 8ddadbcedb..13768e1ab9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java @@ -18,21 +18,25 @@ import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -46,6 +50,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; @@ -54,48 +59,146 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * Integration tests for share Kafka listener. + * Enhanced integration tests for @KafkaListener with share consumer acknowledgment features. * * @author Soby Chacko * @since 4.0 */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(topics = "share-listener-integration-test", +@EmbeddedKafka(topics = { + "share-listener-basic-test", + "share-listener-explicit-ack-test", + "share-listener-consumer-aware-test", + "share-listener-ack-consumer-aware-test", + "share-listener-mixed-ack-test", + "share-listener-error-handling-test" +}, brokerProperties = { "share.coordinator.state.topic.replication.factor=1", "share.coordinator.state.topic.min.isr=1" }) class ShareKafkaListenerIntegrationTests { - private static final CountDownLatch latch = new CountDownLatch(1); - - private static final AtomicReference received = new AtomicReference<>(); - @Autowired EmbeddedKafkaBroker broker; + @Autowired + KafkaTemplate kafkaTemplate; + + @Test + void shouldSupportBasicShareKafkaListener() throws Exception { + final String topic = "share-listener-basic-test"; + final String groupId = "share-listener-basic-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "basic-test-message"); + + // Wait for processing + assertThat(BasicTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(BasicTestListener.received.get()).isEqualTo("basic-test-message"); + } + + @Test + void shouldSupportExplicitAcknowledgmentWithShareAcknowledgment() throws Exception { + final String topic = "share-listener-explicit-ack-test"; + final String groupId = "share-explicit-ack-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test messages + kafkaTemplate.send(topic, "accept", "accept-message"); + kafkaTemplate.send(topic, "release", "release-message"); + kafkaTemplate.send(topic, "reject", "reject-message"); + + // Wait for processing + assertThat(ExplicitAckTestListener.latch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(ExplicitAckTestListener.redeliveryLatch.await(15, TimeUnit.SECONDS)).isTrue(); + + // Verify acknowledgment types were used correctly + assertThat(ExplicitAckTestListener.acknowledgmentTypes).containsKey("accept"); + assertThat(ExplicitAckTestListener.acknowledgmentTypes).containsKey("reject"); + assertThat(ExplicitAckTestListener.acknowledgmentTypes.get("accept")).isEqualTo(AcknowledgeType.ACCEPT); + assertThat(ExplicitAckTestListener.acknowledgmentTypes.get("reject")).isEqualTo(AcknowledgeType.REJECT); + + // The release message should have been redelivered and then accepted + assertThat(ExplicitAckTestListener.redeliveredAndAccepted.get()).isTrue(); + } + + @Test + void shouldSupportShareConsumerAwareListener() throws Exception { + final String topic = "share-listener-consumer-aware-test"; + final String groupId = "share-consumer-aware-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "consumer-aware-message"); + + // Wait for processing + assertThat(ShareConsumerAwareTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(ShareConsumerAwareTestListener.received.get()).isEqualTo("consumer-aware-message"); + assertThat(ShareConsumerAwareTestListener.consumerReceived.get()).isNotNull(); + } + + @Test + void shouldSupportAcknowledgingShareConsumerAwareListener() throws Exception { + final String topic = "share-listener-ack-consumer-aware-test"; + final String groupId = "share-ack-consumer-aware-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "ack-consumer-aware-message"); + + // Wait for processing + assertThat(AckShareConsumerAwareTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(AckShareConsumerAwareTestListener.received.get()).isEqualTo("ack-consumer-aware-message"); + assertThat(AckShareConsumerAwareTestListener.consumerReceived.get()).isNotNull(); + assertThat(AckShareConsumerAwareTestListener.acknowledgmentReceived.get()).isNotNull(); + assertThat(isAcknowledgedInternal(AckShareConsumerAwareTestListener.acknowledgmentReceived.get())).isTrue(); + } + + @Test + void shouldHandleMixedAcknowledgmentScenarios() throws Exception { + final String topic = "share-listener-mixed-ack-test"; + final String groupId = "share-mixed-ack-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send multiple test messages with different scenarios + kafkaTemplate.send(topic, "success1", "success-message-1"); + kafkaTemplate.send(topic, "success2", "success-message-2"); + kafkaTemplate.send(topic, "retry", "retry-message"); + + // Wait for processing + assertThat(MixedAckTestListener.processedLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(MixedAckTestListener.retryLatch.await(15, TimeUnit.SECONDS)).isTrue(); + + // Verify correct processing + assertThat(MixedAckTestListener.processedCount.get()).isEqualTo(4); // 3 original + 1 retry + assertThat(MixedAckTestListener.successCount.get()).isEqualTo(3); // 2 success + 1 retry success + assertThat(MixedAckTestListener.retryCount.get()).isEqualTo(1); + } + @Test - void integrationTestShareKafkaListener() throws Exception { - final String topic = "share-listener-integration-test"; - final String groupId = "share-listener-test-group"; + void shouldHandleProcessingErrorsCorrectly() throws Exception { + final String topic = "share-listener-error-handling-test"; + final String groupId = "share-error-handling-group"; setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); - Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - ProducerFactory pf = new DefaultKafkaProducerFactory<>(props); - KafkaTemplate template = new KafkaTemplate<>(pf); - template.setDefaultTopic(topic); - template.sendDefault("foo"); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(received.get()).isEqualTo("foo"); + // Send messages that will trigger errors + kafkaTemplate.send(topic, "success", "success-message"); + kafkaTemplate.send(topic, "error", "error-message"); + kafkaTemplate.send(topic, "success2", "success-message-2"); + + // Wait for processing + assertThat(ErrorHandlingTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + // Verify error handling + assertThat(ErrorHandlingTestListener.successCount.get()).isEqualTo(2); + assertThat(ErrorHandlingTestListener.errorCount.get()).isEqualTo(1); } /** - * Sets the share.auto.offset.reset group config to earliest for the given groupId, - * using the provided bootstrapServers. + * Sets the share.auto.offset.reset group config to earliest for the given groupId. */ private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { Map adminProperties = new HashMap<>(); @@ -103,12 +206,24 @@ private static void setShareAutoOffsetResetEarliest(String bootstrapServers, Str try (Admin admin = Admin.create(adminProperties)) { ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest"); - Map> configs = java.util.Collections.singletonMap(configResource, - java.util.Collections.singleton(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))); - AlterConfigsResult alterConfigsResult = admin.incrementalAlterConfigs(configs); - alterConfigsResult.all().get(); + Map> configs = Map.of(configResource, + List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))); + admin.incrementalAlterConfigs(configs).all().get(); } + } + /** + * Helper method to access internal acknowledgment state for testing. + */ + private boolean isAcknowledgedInternal(ShareAcknowledgment ack) { + try { + java.lang.reflect.Method method = ack.getClass().getDeclaredMethod("isAcknowledged"); + method.setAccessible(true); + return (Boolean) method.invoke(ack); + } + catch (Exception e) { + throw new RuntimeException("Failed to access internal acknowledgment state", e); + } } @Configuration @@ -119,33 +234,250 @@ static class TestConfig { public ShareConsumerFactory shareConsumerFactory(EmbeddedKafkaBroker broker) { Map configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); - configs.put(ConsumerConfig.GROUP_ID_CONFIG, "share-listener-test-group"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultShareConsumerFactory<>(configs); } + @Bean + public ShareConsumerFactory explicitShareConsumerFactory(EmbeddedKafkaBroker broker) { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put("share.acknowledgement.mode", "explicit"); + return new DefaultShareConsumerFactory<>(configs); + } + @Bean public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( ShareConsumerFactory shareConsumerFactory) { - return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); } @Bean - public TestListener listener() { - return new TestListener(); + public ShareKafkaListenerContainerFactory explicitShareKafkaListenerContainerFactory( + ShareConsumerFactory explicitShareConsumerFactory) { + return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory); + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + + // Test listeners as beans + @Bean + public BasicTestListener basicTestListener() { + return new BasicTestListener(); } + @Bean + public ExplicitAckTestListener explicitAckTestListener() { + return new ExplicitAckTestListener(); + } + + @Bean + public ShareConsumerAwareTestListener shareConsumerAwareTestListener() { + return new ShareConsumerAwareTestListener(); + } + + @Bean + public AckShareConsumerAwareTestListener ackShareConsumerAwareTestListener() { + return new AckShareConsumerAwareTestListener(); + } + + @Bean + public MixedAckTestListener mixedAckTestListener() { + return new MixedAckTestListener(); + } + + @Bean + public ErrorHandlingTestListener errorHandlingTestListener() { + return new ErrorHandlingTestListener(); + } } - static class TestListener { + // Test listener classes + static class BasicTestListener { + + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); - @KafkaListener(topics = "share-listener-integration-test", containerFactory = "shareKafkaListenerContainerFactory") + @KafkaListener(topics = "share-listener-basic-test", + groupId = "share-listener-basic-group", + containerFactory = "shareKafkaListenerContainerFactory") public void listen(ConsumerRecord record) { received.set(record.value()); latch.countDown(); } + + } + + static class ExplicitAckTestListener { + + static final CountDownLatch latch = new CountDownLatch(3); + + static final CountDownLatch redeliveryLatch = new CountDownLatch(1); + + static final Map acknowledgmentTypes = new HashMap<>(); + + static final AtomicReference redeliveredAndAccepted = new AtomicReference<>(false); + + @KafkaListener(topics = "share-listener-explicit-ack-test", + groupId = "share-explicit-ack-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareAcknowledgment acknowledgment, + ShareConsumer consumer) { + String key = record.key(); + + if ("accept".equals(key)) { + acknowledgment.acknowledge(); + acknowledgmentTypes.put(key, AcknowledgeType.ACCEPT); + latch.countDown(); + } + else if ("release".equals(key)) { + if (!acknowledgmentTypes.containsKey("release-attempted")) { + // First attempt - release it + acknowledgment.release(); + acknowledgmentTypes.put("release-attempted", AcknowledgeType.RELEASE); + latch.countDown(); + } + else { + // Redelivered - accept it + acknowledgment.acknowledge(); + redeliveredAndAccepted.set(true); + redeliveryLatch.countDown(); + } + } + else if ("reject".equals(key)) { + acknowledgment.reject(); + acknowledgmentTypes.put(key, AcknowledgeType.REJECT); + latch.countDown(); + } + } + } + + static class ShareConsumerAwareTestListener { + + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); + + static final AtomicReference> consumerReceived = new AtomicReference<>(); + + @KafkaListener(topics = "share-listener-consumer-aware-test", + groupId = "share-consumer-aware-group", + containerFactory = "shareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareConsumer consumer) { + received.set(record.value()); + consumerReceived.set(consumer); + latch.countDown(); + } + } + + static class AckShareConsumerAwareTestListener { + + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); + + static final AtomicReference> consumerReceived = new AtomicReference<>(); + + static final AtomicReference acknowledgmentReceived = new AtomicReference<>(); + + @KafkaListener(topics = "share-listener-ack-consumer-aware-test", + groupId = "share-ack-consumer-aware-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + received.set(record.value()); + consumerReceived.set(consumer); + acknowledgmentReceived.set(acknowledgment); + if (acknowledgment != null) { + acknowledgment.acknowledge(); // ACCEPT + } + latch.countDown(); + } + } + + static class MixedAckTestListener { + + static final CountDownLatch processedLatch = new CountDownLatch(3); + + static final CountDownLatch retryLatch = new CountDownLatch(1); + + static final AtomicInteger processedCount = new AtomicInteger(); + + static final AtomicInteger successCount = new AtomicInteger(); + + static final AtomicInteger retryCount = new AtomicInteger(); + + @KafkaListener(topics = "share-listener-mixed-ack-test", + groupId = "share-mixed-ack-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + String key = record.key(); + int count = processedCount.incrementAndGet(); + + if ("retry".equals(key)) { + if (retryCount.get() == 0) { + // First attempt - release for retry + retryCount.incrementAndGet(); + acknowledgment.release(); + processedLatch.countDown(); + } + else { + // Retry attempt - accept + successCount.incrementAndGet(); + acknowledgment.acknowledge(); + retryLatch.countDown(); + } + } + else { + // Success messages + successCount.incrementAndGet(); + acknowledgment.acknowledge(); + processedLatch.countDown(); + } + } + } + + static class ErrorHandlingTestListener { + + static final CountDownLatch latch = new CountDownLatch(3); + + static final AtomicInteger successCount = new AtomicInteger(); + + static final AtomicInteger errorCount = new AtomicInteger(); + + @KafkaListener(topics = "share-listener-error-handling-test", + groupId = "share-error-handling-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + String key = record.key(); + if ("error".equals(key)) { + errorCount.incrementAndGet(); + latch.countDown(); + // Let the error propagate - container should auto-reject + throw new RuntimeException("Simulated processing error"); + } + else { + successCount.incrementAndGet(); + acknowledgment.acknowledge(); + latch.countDown(); + } + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java index 7aff9fa602..eac63b415c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -16,43 +16,70 @@ package org.springframework.kafka.listener; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.core.log.LogAccessor; import org.springframework.kafka.core.DefaultShareConsumerFactory; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -/** - * Basic tests for {@link ShareKafkaMessageListenerContainer}. - * - * @author Soby Chacko - * @since 4.0 - */ @EmbeddedKafka( - topics = {"share-listener-integration-test"}, partitions = 1, - brokerProperties = { - "share.coordinator.state.topic.replication.factor=1", - "share.coordinator.state.topic.min.isr=1" - } + topics = { + "share-listener-integration-test", + "share-container-explicit-test", + "share-container-implicit-test", + "share-container-constraint-test", + "share-container-partial-test", + "share-container-concurrent-test", + "share-container-error-test", + "share-container-mixed-ack-test", + "share-container-lifecycle-test" + }, + partitions = 1, + brokerProperties = { + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + } ) class ShareKafkaMessageListenerContainerIntegrationTests { @@ -96,28 +123,709 @@ void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broke try { assertThat(latch.await(10, java.util.concurrent.TimeUnit.SECONDS) && "integration-test-value".equals(received.get())) - .as("Message should be received and have expected value") - .isTrue(); + .as("Message should be received and have expected value") + .isTrue(); } finally { container.stop(); } } - /** - * Sets the share.auto.offset.reset group config to earliest for the given groupId, - * using the provided bootstrapServers. - */ + @Test + void shouldSupportExplicitAcknowledgmentMode(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-explicit-test"; + String groupId = "share-container-explicit-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch latch = new CountDownLatch(3); + List received = Collections.synchronizedList(new ArrayList<>()); + List acknowledgments = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + received.add(record.value()); + acknowledgments.add(acknowledgment); + + // Explicitly acknowledge the record + if (acknowledgment != null) { + acknowledgment.acknowledge(); // ACCEPT + } + + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("explicitAckTestContainer"); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(received).hasSize(3); + assertThat(acknowledgments).hasSize(3) + .allMatch(Objects::nonNull) + .allMatch(ShareKafkaMessageListenerContainerIntegrationTests::isAcknowledgedInternal) + .allMatch(ack -> AcknowledgeType.ACCEPT.equals(getAcknowledgmentTypeInternal(ack))); + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportImplicitAcknowledgmentMode(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-implicit-test"; + String groupId = "share-container-implicit-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + // Default is implicit mode + + CountDownLatch latch = new CountDownLatch(3); + List received = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + received.add(record.value()); + + // In implicit mode, acknowledgment should be null + assertThat(acknowledgment).isNull(); + + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("implicitAckTestContainer"); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(received).hasSize(3); + } + finally { + container.stop(); + } + } + + @Test + void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-constraint-test"; + String groupId = "share-container-constraint-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch firstBatchLatch = new CountDownLatch(3); + CountDownLatch secondBatchLatch = new CountDownLatch(3); + AtomicInteger processedCount = new AtomicInteger(); + List pendingAcks = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + + int count = processedCount.incrementAndGet(); + + if (count <= 3) { + // First batch - collect acknowledgments but don't acknowledge yet + pendingAcks.add(acknowledgment); + firstBatchLatch.countDown(); + } + else { + // Second batch - should only happen after first batch is acknowledged + acknowledgment.acknowledge(); + secondBatchLatch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("constraintTestContainer"); + container.start(); + + LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", + LogAccessor.class)); + + DirectFieldAccessor accessor = new DirectFieldAccessor(container); + accessor.setPropertyValue("listenerConsumer.logger", logAccessor); + + try { + // Wait for first batch to be processed + assertThat(firstBatchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(pendingAcks).hasSize(3); + + // Produce more records for second batch while first is pending + produceTestRecords(bootstrapServers, topic, 3); + + // Wait for the next poll to be blocked since no explicit acknowledgment has been made yet. + // this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() + + // " acknowledgments"); + Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> + verify(logAccessor, atLeastOnce()).trace(ArgumentMatchers.>any()) + ); + + assertThat(processedCount.get()).isEqualTo(3); + + // Acknowledge first batch + for (ShareAcknowledgment ack : pendingAcks) { + ack.acknowledge(); + } + + // Now second batch should be processed + assertThat(secondBatchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(processedCount.get()).isEqualTo(6); + + } + finally { + container.stop(); + } + } + + @Test + void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-partial-test"; + String groupId = "share-container-partial-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch batchLatch = new CountDownLatch(4); + CountDownLatch nextPollLatch = new CountDownLatch(1); + List batchAcks = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger totalProcessed = new AtomicInteger(); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + + int count = totalProcessed.incrementAndGet(); + + if (count <= 4) { + batchAcks.add(acknowledgment); + batchLatch.countDown(); + } + else { + // This should only happen after all previous records acknowledged + acknowledgment.acknowledge(); + nextPollLatch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("partialAckTestContainer"); + container.start(); + + LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", + LogAccessor.class)); + + DirectFieldAccessor accessor = new DirectFieldAccessor(container); + accessor.setPropertyValue("listenerConsumer.logger", logAccessor); + + produceTestRecords(bootstrapServers, topic, 4); + + try { + // Wait for batch to be processed + assertThat(batchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(batchAcks).hasSize(4); + + // Acknowledge only first 3 records + for (int i = 0; i < 3; i++) { + batchAcks.get(i).acknowledge(); + } + + // Produce more records + produceTestRecords(bootstrapServers, topic, 1); + + // Wait for the next poll to be blocked since one acknowledgment is still pending. + // this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() + + // " acknowledgments"); + Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> + verify(logAccessor, atLeastOnce()).trace(ArgumentMatchers.>any()) + ); + + assertThat(totalProcessed.get()).isEqualTo(4); + + // Acknowledge the last pending record + batchAcks.get(3).acknowledge(); + + // Now should process new records + assertThat(nextPollLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(totalProcessed.get()).isEqualTo(5); + + } + finally { + container.stop(); + } + } + + @Test + void shouldHandleProcessingErrorsInExplicitMode(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-error-test"; + String groupId = "share-container-error-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 5); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch latch = new CountDownLatch(5); + AtomicInteger errorCount = new AtomicInteger(); + AtomicInteger successCount = new AtomicInteger(); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + + // Simulate error for every 3rd record + if (record.value().endsWith("2")) { // value2 + errorCount.incrementAndGet(); + latch.countDown(); + throw new RuntimeException("Simulated processing error"); + } + else { + successCount.incrementAndGet(); + acknowledgment.acknowledge(); + } + + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("errorTestContainer"); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isEqualTo(1); + assertThat(successCount.get()).isEqualTo(4); + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportMixedAcknowledgmentTypes(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-mixed-ack-test"; + String groupId = "share-container-mixed-ack-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + // Produce test records with different keys to identify them + try (var producer = createProducer(bootstrapServers)) { + producer.send(new ProducerRecord<>(topic, "accept", "accept-value")).get(); + producer.send(new ProducerRecord<>(topic, "release", "release-value")).get(); + producer.send(new ProducerRecord<>(topic, "reject", "reject-value")).get(); + } + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch firstRoundLatch = new CountDownLatch(3); + CountDownLatch redeliveryLatch = new CountDownLatch(1); + Map ackTypes = new ConcurrentHashMap<>(); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + + String key = record.key(); + + if ("accept".equals(key)) { + acknowledgment.acknowledge(); + ackTypes.put(key, AcknowledgeType.ACCEPT); + firstRoundLatch.countDown(); + } + else if ("release".equals(key)) { + if (!ackTypes.containsKey("release-redelivered")) { + // First delivery - release it + acknowledgment.release(); + ackTypes.put("release-redelivered", AcknowledgeType.RELEASE); + firstRoundLatch.countDown(); + } + else { + // Redelivered - accept it + acknowledgment.acknowledge(); + ackTypes.put(key, AcknowledgeType.ACCEPT); + redeliveryLatch.countDown(); + } + } + else if ("reject".equals(key)) { + acknowledgment.reject(); + ackTypes.put(key, AcknowledgeType.REJECT); + firstRoundLatch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("mixedAckTestContainer"); + container.start(); + + try { + // Wait for first round of processing + assertThat(firstRoundLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(ackTypes.get("accept")).isEqualTo(AcknowledgeType.ACCEPT); + assertThat(ackTypes.get("reject")).isEqualTo(AcknowledgeType.REJECT); + + // Wait for redelivery of released record + assertThat(redeliveryLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(ackTypes.get("release")).isEqualTo(AcknowledgeType.ACCEPT); // what got released, was accepted eventually. + assertThat(ackTypes.get("release-redelivered")).isEqualTo(AcknowledgeType.RELEASE); + + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportDifferentListenerTypes(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-implicit-test"; + String groupId = "share-container-listener-types-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + // Test 1: Basic MessageListener + testBasicMessageListener(factory, topic, bootstrapServers, groupId + "-basic"); + + // Test 2: ShareConsumerAwareMessageListener + testShareConsumerAwareListener(factory, topic, bootstrapServers, groupId + "-aware"); + + // Test 3: AcknowledgingShareConsumerAwareMessageListener in implicit mode + testAckListenerInImplicitMode(factory, topic, bootstrapServers, groupId + "-ack-implicit"); + } + + @Test + void shouldHandleContainerLifecycle(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-lifecycle-test"; + String groupId = "share-container-lifecycle-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch firstProcessingLatch = new CountDownLatch(1); + CountDownLatch secondProcessingLatch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + int count = callCount.incrementAndGet(); + if (count == 1) { + firstProcessingLatch.countDown(); + } + else if (count == 2) { + secondProcessingLatch.countDown(); + } + acknowledgment.acknowledge(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("lifecycleTestContainer"); + + assertThat(container.isRunning()).isFalse(); + + container.start(); + assertThat(container.isRunning()).isTrue(); + + produceTestRecords(bootstrapServers, topic, 1); + assertThat(firstProcessingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + assertThat(container.isRunning()).isFalse(); + + container.start(); + assertThat(container.isRunning()).isTrue(); + + produceTestRecords(bootstrapServers, topic, 1); + assertThat(secondProcessingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + } + + @Test + void shouldHandleConcurrentAcknowledgmentAttempts(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrent-test"; + String groupId = "share-container-concurrent-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch processedLatch = new CountDownLatch(1); + AtomicReference ackRef = new AtomicReference<>(); + AtomicInteger successfulAcks = new AtomicInteger(); + AtomicInteger failedAcks = new AtomicInteger(); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + ackRef.set(acknowledgment); + processedLatch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrentAckTestContainer"); + container.start(); + + try { + // Wait for record to be processed + assertThat(processedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + ShareAcknowledgment ack = ackRef.get(); + assertThat(ack).isNotNull(); + + // Try to acknowledge the same record concurrently from multiple threads + int numThreads = 10; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch threadLatch = new CountDownLatch(numThreads); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + ack.acknowledge(); + successfulAcks.incrementAndGet(); + } + catch (IllegalStateException e) { + failedAcks.incrementAndGet(); + } + finally { + threadLatch.countDown(); + } + }); + } + + assertThat(threadLatch.await(10, TimeUnit.SECONDS)).isTrue(); + executor.shutdown(); + + // Only one acknowledgment should succeed + assertThat(successfulAcks.get()).isEqualTo(1); + assertThat(failedAcks.get()).isEqualTo(numThreads - 1); + // Check internal state through reflection since isAcknowledged() is no longer public + assertThat(isAcknowledgedInternal(ack)).isTrue(); + + } + finally { + container.stop(); + } + } + + private static void testBasicMessageListener(DefaultShareConsumerFactory factory, + String topic, String bootstrapServers, String groupId) throws Exception { + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(1); + + containerProps.setMessageListener((MessageListener) record -> { + assertThat(record).isNotNull(); + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("basicListenerTest"); + container.start(); + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + container.stop(); + } + } + + private static void testShareConsumerAwareListener(DefaultShareConsumerFactory factory, + String topic, String bootstrapServers, String groupId) throws Exception { + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference> consumerRef = new AtomicReference<>(); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + assertThat(record).isNotNull(); + assertThat(consumer).isNotNull(); + // In implicit mode, acknowledgment should be null + assertThat(acknowledgment).isNull(); + consumerRef.set(consumer); + latch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("consumerAwareListenerTest"); + container.start(); + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(consumerRef.get()).isNotNull(); + } + finally { + container.stop(); + } + } + + private static void testAckListenerInImplicitMode(DefaultShareConsumerFactory factory, + String topic, String bootstrapServers, String groupId) throws Exception { + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + ContainerProperties containerProps = new ContainerProperties(topic); + // Implicit mode (default) + CountDownLatch latch = new CountDownLatch(1); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + assertThat(record).isNotNull(); + assertThat(consumer).isNotNull(); + // In implicit mode, acknowledgment should be null + assertThat(acknowledgment).isNull(); + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("ackListenerImplicitTest"); + container.start(); + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + container.stop(); + } + } + + // Utility methods + private static Map createConsumerProps(String bootstrapServers, String groupId, boolean explicit) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + if (explicit) { + props.put("share.acknowledgement.mode", "explicit"); + } + return props; + } + + private static void produceTestRecords(String bootstrapServers, String topic, int count) throws Exception { + try (var producer = createProducer(bootstrapServers)) { + for (int i = 0; i < count; i++) { + producer.send(new ProducerRecord<>(topic, "key" + i, "value" + i)).get(); + } + } + } + + private static KafkaProducer createProducer(String bootstrapServers) { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new KafkaProducer<>(producerProps); + } + private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { Map adminProperties = new HashMap<>(); - adminProperties.put("bootstrap.servers", bootstrapServers); + adminProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); Map> configs = Map.of( new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); - try (Admin admin = AdminClient.create(adminProperties)) { + try (Admin admin = Admin.create(adminProperties)) { admin.incrementalAlterConfigs(configs).all().get(); } } + /** + * Helper method to access internal acknowledgment state for testing. + */ + private static boolean isAcknowledgedInternal(ShareAcknowledgment ack) { + try { + java.lang.reflect.Method method = ack.getClass().getDeclaredMethod("isAcknowledged"); + method.setAccessible(true); + return (Boolean) method.invoke(ack); + } + catch (Exception e) { + throw new RuntimeException("Failed to access internal acknowledgment state", e); + } + } + + /** + * Helper method to access internal acknowledgment type for testing. + */ + private static AcknowledgeType getAcknowledgmentTypeInternal(ShareAcknowledgment ack) { + try { + java.lang.reflect.Method method = ack.getClass().getDeclaredMethod("getAcknowledgmentType"); + method.setAccessible(true); + return (AcknowledgeType) method.invoke(ack); + } + catch (Exception e) { + throw new RuntimeException("Failed to access internal acknowledgment type", e); + } + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java new file mode 100644 index 0000000000..ff1f14104d --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java @@ -0,0 +1,145 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.kafka.core.ShareConsumerFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** + * Unit tests for {@link ShareKafkaMessageListenerContainer}. + * These tests focus on configuration validation and setup logic. + * Message processing, acknowledgment behavior, and error handling are covered + * by integration tests in {@link ShareKafkaMessageListenerContainerIntegrationTests}. + * + * @author Soby Chacko + * @since 4.0 + */ +@ExtendWith(MockitoExtension.class) +public class ShareKafkaMessageListenerContainerUnitTests { + + @Mock + private ShareConsumerFactory shareConsumerFactory; + + @Mock + private MessageListener messageListener; + + @Mock + private AcknowledgingShareConsumerAwareMessageListener ackListener; + + @Test + void shouldConfigureExplicitModeCorrectly() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setExplicitShareAcknowledgment(true); + containerProperties.setMessageListener(ackListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getContainerProperties().isExplicitShareAcknowledgment()) + .isTrue(); + } + + @Test + void shouldConfigureImplicitModeByDefault() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getContainerProperties().isExplicitShareAcknowledgment()) + .isFalse(); + } + + @Test + void shouldFailWhenExplicitModeUsedWithNonAcknowledgingListener() { + // Given: A container with explicit acknowledgment mode + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setExplicitShareAcknowledgment(true); + // Using a non-acknowledging listener (just GenericMessageListener) + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + // Starting the container should fail with validation error + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(container::start) + .withMessageContaining("Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener"); + } + + @Test + void shouldValidateListenerTypeOnStartup() { + // Given: A container with explicit acknowledgment mode and proper listener + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setExplicitShareAcknowledgment(true); + // Using an acknowledging listener should not throw during construction + containerProperties.setMessageListener(ackListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + // Validation occurs during startup, but we don't need to actually start for this test + assertThat(container.getContainerProperties().isExplicitShareAcknowledgment()).isTrue(); + } + + @Test + void shouldSupportBeanNameSetting() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.setBeanName("testContainer"); + assertThat(container.getBeanName()).isEqualTo("testContainer"); + assertThat(container.getListenerId()).isEqualTo("testContainer"); + } + + @Test + void shouldReportRunningStateBeforeStart() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + // Should not be running before start + assertThat(container.isRunning()).isFalse(); + } + + @Test + void shouldSupportContainerProperties() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(ackListener); + containerProperties.setExplicitShareAcknowledgment(true); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getContainerProperties().isExplicitShareAcknowledgment()) + .isTrue(); + } + +}