From 83aaa6a331b516d527b431cfcf39c51fd6c14949 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 6 Oct 2025 20:22:39 -0400 Subject: [PATCH 1/2] Add concurrency support to `ShareKafkaMessageListenerContainer` Share consumers (KIP-932) enable record-level load balancing where multiple consumers can cooperatively process from the same partitions. Unlike traditional consumer groups with exclusive partition ownership, share groups distribute work at the broker level via the share group coordinator. This commit adds native concurrency support to the existing `ShareKafkaMessageListenerContainer` rather than creating a separate `ConcurrentShareKafkaMessageListenerContainer`. This design choice avoids the parent/child container complexity that exists in the regular consumer model, since share consumers fundamentally operate differently: - Work distribution happens at the broker level, not at the Spring layer - Multiple threads simply provide more capacity for the broker to distribute records across - No partition ownership model to coordinate between child containers This approach provides: - Simpler architecture with a single container managing multiple threads - No parent/child context propagation concerns - Better alignment with share consumer semantics (record-level vs partition-level distribution) - Increased throughput for high-volume workloads - Better resource utilization across consumer threads Users can configure concurrency at three levels: 1. Per-listener via `@KafkaListener(concurrency = N)` 2. Factory-level default via `factory.setConcurrency(N)` 3. Programmatically via `container.setConcurrency(N)` The feature works seamlessly with both implicit (auto-acknowledge) and explicit (manual acknowledge/release/reject) acknowledgment modes, with each consumer thread independently managing its own acknowledgments. Signed-off-by: Soby Chacko --- .../ROOT/pages/kafka/kafka-queues.adoc | 144 ++++++++- .../ShareKafkaListenerContainerFactory.java | 28 ++ .../ShareKafkaMessageListenerContainer.java | 117 ++++++- ...sageListenerContainerIntegrationTests.java | 299 +++++++++++++++++- ...afkaMessageListenerContainerUnitTests.java | 31 ++ 5 files changed, 595 insertions(+), 24 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index 7f59a727fb..2aeceefd50 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -107,7 +107,7 @@ factory.addListener(new ShareConsumerFactory.Listener() { [[share-kafka-message-listener-container]] === ShareKafkaMessageListenerContainer -The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers: +The `ShareKafkaMessageListenerContainer` provides a container for share consumers with support for concurrent processing: [source,java] ---- @@ -151,6 +151,145 @@ Share consumers do not support: * Manual offset management ==== +[[share-container-concurrency]] +=== Concurrency + +The `ShareKafkaMessageListenerContainer` supports concurrent processing by creating multiple consumer threads within a single container. +Each thread runs its own `ShareConsumer` instance that participates in the same share group. + +Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker. +This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances. + +==== Configuring Concurrency Programmatically + +[source,java] +---- +@Bean +public ShareKafkaMessageListenerContainer concurrentContainer( + ShareConsumerFactory shareConsumerFactory) { + + ContainerProperties containerProps = new ContainerProperties("my-topic"); + containerProps.setGroupId("my-share-group"); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps); + + // Set concurrency to create 5 consumer threads + container.setConcurrency(5); + + container.setupMessageListener(new MessageListener() { + @Override + public void onMessage(ConsumerRecord record) { + System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value()); + } + }); + + return container; +} +---- + +==== Configuring Concurrency via Factory + +You can set default concurrency at the factory level, which applies to all containers created by that factory: + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + + // Set default concurrency for all containers created by this factory + factory.setConcurrency(3); + + return factory; +} +---- + +==== Per-Listener Concurrency + +The concurrency setting can be overridden per listener using the `concurrency` attribute: + +[source,java] +---- +@Component +public class ConcurrentShareListener { + + @KafkaListener( + topics = "high-throughput-topic", + containerFactory = "shareKafkaListenerContainerFactory", + groupId = "my-share-group", + concurrency = "10" // Override factory default + ) + public void listen(ConsumerRecord record) { + // This listener will use 10 consumer threads + System.out.println("Processing: " + record.value()); + } +} +---- + +==== Concurrency Considerations + +* **Thread Safety**: Each consumer thread has its own `ShareConsumer` instance and manages its own acknowledgments independently +* **Client IDs**: Each consumer thread receives a unique client ID with a numeric suffix (e.g., `myContainer-0`, `myContainer-1`, etc.) +* **Metrics**: Metrics from all consumer threads are aggregated and accessible via `container.metrics()` +* **Lifecycle**: All consumer threads start and stop together as a unit +* **Work Distribution**: The Kafka broker handles record distribution across all consumer instances in the share group +* **Explicit Acknowledgment**: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don't block other threads + +==== Concurrency with Explicit Acknowledgment + +Concurrency works seamlessly with explicit acknowledgment mode. +Each consumer thread independently tracks and acknowledges its own records: + +[source,java] +---- +@KafkaListener( + topics = "order-queue", + containerFactory = "explicitShareKafkaListenerContainerFactory", + groupId = "order-processors", + concurrency = "5" +) +public void processOrder(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + try { + // Process the order + processOrderLogic(record.value()); + acknowledgment.acknowledge(); // ACCEPT + } + catch (RetryableException e) { + acknowledgment.release(); // Will be redelivered + } + catch (Exception e) { + acknowledgment.reject(); // Permanent failure + } +} +---- + +[NOTE] +==== +**Work Distribution Behavior:** + +With share consumers, record distribution is controlled by Kafka's share group coordinator at the broker level, not by Spring for Apache Kafka. +The broker may assign all records to a single consumer thread at any given time, especially when: + +* The topic has a single partition +* There's low message volume +* The broker's distribution algorithm favors certain consumers + +This is normal behavior. The key benefit of concurrency is having multiple consumer threads *available* to the share group coordinator for distribution. +As message volume increases or over time, you should see distribution across multiple threads. + +This differs from traditional `ConcurrentMessageListenerContainer` where Spring explicitly distributes partitions across threads. + +When using concurrency with share consumers: + +* Each thread polls and processes records independently +* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads) +* Concurrency setting must be greater than 0 and cannot be changed while the container is running +==== + [[share-annotation-driven-listeners]] == Annotation-Driven Listeners @@ -520,8 +659,7 @@ Share consumers differ from regular consumers in several key ways: === Current Limitations * **In preview**: This feature is in preview mode and may change in future versions -* **Single-Threaded**: Share consumer containers currently run in single-threaded mode * **No Message Converters**: Message converters are not yet supported for share consumers * **No Batch Listeners**: Batch processing is not supported with share consumers -* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls +* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index cac3cd2c66..7e46187c48 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode; +import org.jspecify.annotations.Nullable; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -63,6 +64,8 @@ public class ShareKafkaListenerContainerFactory private int phase = 0; + private @Nullable Integer concurrency; + @SuppressWarnings("NullAway.Init") private ApplicationEventPublisher applicationEventPublisher; @@ -98,6 +101,22 @@ public void setPhase(int phase) { this.phase = phase; } + /** + * Set the concurrency for containers created by this factory. + *

+ * This specifies the number of consumer threads to create within each container. + * Each thread creates its own {@link org.apache.kafka.clients.consumer.ShareConsumer} + * instance and participates in the same share group. The Kafka broker distributes + * records across all consumer instances, providing record-level load balancing. + *

+ * This can be overridden per listener endpoint using the {@code concurrency} + * attribute on {@code @KafkaListener}. + * @param concurrency the number of consumer threads (must be greater than 0) + */ + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; @@ -138,6 +157,15 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst boolean explicitAck = determineExplicitAcknowledgment(properties); properties.setExplicitShareAcknowledgment(explicitAck); + // Set concurrency - endpoint setting takes precedence over factory setting + Integer conc = endpoint.getConcurrency(); + if (conc != null) { + instance.setConcurrency(conc); + } + else if (this.concurrency != null) { + instance.setConcurrency(this.concurrency); + } + instance.setAutoStartup(effectiveAutoStartup); instance.setPhase(this.phase); instance.setApplicationContext(this.applicationContext); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index e4216a102a..b4d041c449 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -16,8 +16,10 @@ package org.springframework.kafka.listener; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -45,19 +47,29 @@ import org.springframework.util.Assert; /** - * Single-threaded share consumer container using the Java {@link ShareConsumer}. + * Share consumer container using the Java {@link ShareConsumer}. *

* This container provides support for Kafka share groups, enabling cooperative * consumption where multiple consumers can process records from the same partitions. * Unlike traditional consumer groups with exclusive partition assignment, share groups * allow load balancing at the record level. *

+ * Concurrency Support: + *

+ * This container supports running multiple consumer threads via the {@link #setConcurrency(int)} + * method. Each thread creates its own {@link ShareConsumer} instance and polls independently. + * Unlike traditional consumer groups where concurrency involves partition distribution, + * share consumers leverage Kafka's record-level distribution across all group members. + * This means multiple threads in the same container participate in the same share group, + * with the broker distributing records across all consumer instances. + *

* Key features: *

    *
  • Explicit and implicit acknowledgment modes
  • *
  • Automatic error handling with REJECT acknowledgments
  • *
  • Poll-level acknowledgment constraints in explicit mode
  • *
  • Integration with Spring's {@code @KafkaListener} annotation
  • + *
  • Configurable concurrency for increased throughput
  • *
*

* Acknowledgment Modes: @@ -86,11 +98,11 @@ public class ShareKafkaMessageListenerContainer @Nullable private String clientId; - @SuppressWarnings("NullAway.Init") - private volatile ShareListenerConsumer listenerConsumer; + private int concurrency = 1; - @SuppressWarnings("NullAway.Init") - private volatile CompletableFuture listenerConsumerFuture; + private final List consumers = new ArrayList<>(); + + private final List> consumerFutures = new ArrayList<>(); private volatile CountDownLatch startLatch = new CountDownLatch(1); @@ -122,6 +134,29 @@ public void setClientId(String clientId) { this.clientId = clientId; } + /** + * Get the concurrency level (number of consumer threads). + * @return the concurrency level + */ + public int getConcurrency() { + return this.concurrency; + } + + /** + * Set the level of concurrency. This will create the specified number of + * consumer threads, each with its own {@link ShareConsumer} instance. + * All consumers participate in the same share group, leveraging Kafka's + * record-level distribution for load balancing. + *

+ * Must be called before the container is started. + * @param concurrency the concurrency level (must be greater than 0) + */ + public void setConcurrency(int concurrency) { + Assert.isTrue(concurrency > 0, "concurrency must be greater than 0"); + Assert.state(!isRunning(), "Cannot change concurrency while container is running"); + this.concurrency = concurrency; + } + @Override public boolean isInExpectedState() { return isRunning(); @@ -129,12 +164,24 @@ public boolean isInExpectedState() { @Override public Map> metrics() { - ShareListenerConsumer listenerConsumerForMetrics = this.listenerConsumer; - if (listenerConsumerForMetrics != null) { - Map metrics = listenerConsumerForMetrics.consumer.metrics(); - return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics); + this.lifecycleLock.lock(); + try { + if (this.consumers.isEmpty()) { + return Collections.emptyMap(); + } + Map> allMetrics = new ConcurrentHashMap<>(); + for (ShareListenerConsumer consumer : this.consumers) { + Map consumerMetrics = consumer.consumer.metrics(); + String consumerId = consumer.getClientId(); + if (consumerId != null) { + allMetrics.put(consumerId, consumerMetrics); + } + } + return Collections.unmodifiableMap(allMetrics); + } + finally { + this.lifecycleLock.unlock(); } - return Collections.emptyMap(); } @Override @@ -161,15 +208,51 @@ protected void doStart() { "Either use implicit acknowledgment mode or provide a listener that can handle acknowledgments."); } - this.listenerConsumer = new ShareListenerConsumer(listener); setRunning(true); - this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor); + + // Create multiple consumer threads based on concurrency setting + for (int i = 0; i < this.concurrency; i++) { + String consumerClientId = determineClientId(i); + ShareListenerConsumer consumer = new ShareListenerConsumer(listener, consumerClientId); + this.consumers.add(consumer); + CompletableFuture future = CompletableFuture.runAsync(consumer, consumerExecutor); + this.consumerFutures.add(future); + } + } + + /** + * Determine the client ID for a consumer thread. + * @param index the consumer index + * @return the client ID to use + */ + private String determineClientId(int index) { + String baseClientId = this.clientId != null ? this.clientId : getBeanName(); + if (this.concurrency > 1) { + return baseClientId + "-" + index; + } + return baseClientId; } @Override protected void doStop() { setRunning(false); - // The consumer will exit its loop naturally when running becomes false. + // Wait for all consumer threads to complete + this.lifecycleLock.lock(); + try { + for (CompletableFuture future : this.consumerFutures) { + try { + future.join(); // Wait for consumer to finish + } + catch (Exception e) { + this.logger.error(e, "Error waiting for consumer thread to stop"); + } + } + this.consumers.clear(); + this.consumerFutures.clear(); + } + finally { + this.lifecycleLock.unlock(); + } } private void publishConsumerStartingEvent() { @@ -233,13 +316,13 @@ private class ShareListenerConsumer implements Runnable { private final long ackTimeoutMs; - ShareListenerConsumer(GenericMessageListener listener) { + ShareListenerConsumer(GenericMessageListener listener, String consumerClientId) { this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( ShareKafkaMessageListenerContainer.this.getGroupId(), - ShareKafkaMessageListenerContainer.this.getClientId()); + consumerClientId); this.genericListener = listener; - this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); + this.clientId = consumerClientId; ContainerProperties containerProperties = getContainerProperties(); // Configure acknowledgment mode @@ -509,7 +592,7 @@ private void acknowledgeInternal(AcknowledgeType type) { } // Queue the acknowledgment to be processed on the consumer thread - ShareKafkaMessageListenerContainer.this.listenerConsumer.acknowledgmentQueue.offer( + ShareListenerConsumer.this.acknowledgmentQueue.offer( new PendingAcknowledgment<>(this.record, type)); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java index eac63b415c..9ad2224efa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; @@ -83,6 +84,8 @@ ) class ShareKafkaMessageListenerContainerIntegrationTests { + private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(ShareKafkaMessageListenerContainerIntegrationTests.class)); + @Test void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broker) throws Exception { final String topic = "share-listener-integration-test"; @@ -265,11 +268,12 @@ void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker) container.setBeanName("constraintTestContainer"); container.start(); - LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", + // Access the first consumer from the consumers list + LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "consumers[0].logger", LogAccessor.class)); DirectFieldAccessor accessor = new DirectFieldAccessor(container); - accessor.setPropertyValue("listenerConsumer.logger", logAccessor); + accessor.setPropertyValue("consumers[0].logger", logAccessor); try { // Wait for first batch to be processed @@ -343,11 +347,12 @@ void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) thro container.setBeanName("partialAckTestContainer"); container.start(); - LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", + // Access the first consumer from the consumers list + LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "consumers[0].logger", LogAccessor.class)); DirectFieldAccessor accessor = new DirectFieldAccessor(container); - accessor.setPropertyValue("listenerConsumer.logger", logAccessor); + accessor.setPropertyValue("consumers[0].logger", logAccessor); produceTestRecords(bootstrapServers, topic, 4); @@ -828,4 +833,290 @@ private static AcknowledgeType getAcknowledgmentTypeInternal(ShareAcknowledgment } } + // ==================== Concurrency Integration Tests ==================== + + @Test + void shouldProcessRecordsWithMultipleConsumerThreads(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-basic-test"; + String groupId = "share-container-concurrency-basic-group"; + String bootstrapServers = broker.getBrokersAsString(); + + // Create topic with multiple partitions to allow better distribution + broker.addTopics(topic); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + // Produce more records than consumers to ensure distribution + int numRecords = 30; + int concurrency = 3; + produceTestRecords(bootstrapServers, topic, numRecords); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(numRecords); + Map threadCounts = new ConcurrentHashMap<>(); + List receivedValues = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener((MessageListener) record -> { + String threadName = Thread.currentThread().getName(); + threadCounts.computeIfAbsent(threadName, k -> new AtomicInteger()).incrementAndGet(); + receivedValues.add(record.value()); + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyBasicTest"); + container.setConcurrency(concurrency); + + assertThat(container.getConcurrency()).isEqualTo(concurrency); + + container.start(); + + try { + // Verify all records are processed + assertThat(latch.await(30, TimeUnit.SECONDS)) + .as("All %d records should be processed", numRecords) + .isTrue(); + + // Log thread distribution for debugging + logger.info(() -> "Thread distribution: " + threadCounts); + + // Verify all records received + assertThat(receivedValues).hasSize(numRecords); + + // Share consumers with single partition may use only one consumer at a time + // So we verify: at least 1 thread used, at most concurrency threads + assertThat(threadCounts.size()) + .as("At least one consumer thread should process records") + .isGreaterThanOrEqualTo(1) + .isLessThanOrEqualTo(concurrency); + + // Verify work was done + int totalProcessed = threadCounts.values().stream() + .mapToInt(AtomicInteger::get) + .sum(); + assertThat(totalProcessed).isEqualTo(numRecords); + } + finally { + container.stop(); + } + } + + @Test + void shouldAggregateMetricsFromMultipleConsumers(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-metrics-test"; + String groupId = "share-container-concurrency-metrics-group"; + String bootstrapServers = broker.getBrokersAsString(); + + broker.addTopics(topic); + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + int concurrency = 4; + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setMessageListener((MessageListener) record -> { + // Simple consumer + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyMetricsTest"); + container.setConcurrency(concurrency); + container.start(); + + try { + // Wait for all consumers to initialize and register metrics + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> { + var metrics = container.metrics(); + assertThat(metrics) + .as("Metrics should be available from all %d consumers", concurrency) + .hasSize(concurrency); + }); + + var metrics = container.metrics(); + + // Verify each consumer has a unique client ID + assertThat(metrics.keySet()) + .as("Each consumer should have unique client ID") + .hasSize(concurrency); + + // Verify client IDs follow the pattern: beanName-0, beanName-1, etc. + for (String clientId : metrics.keySet()) { + assertThat(clientId) + .as("Client ID should contain bean name") + .contains("concurrencyMetricsTest"); + } + + logger.info(() -> "Client IDs from metrics: " + metrics.keySet()); + } + finally { + container.stop(); + } + } + + @Test + void shouldHandleConcurrencyWithExplicitAcknowledgment(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-explicit-test"; + String groupId = "share-container-concurrency-explicit-group"; + String bootstrapServers = broker.getBrokersAsString(); + + broker.addTopics(topic); + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + int numRecords = 15; + int concurrency = 3; + produceTestRecords(bootstrapServers, topic, numRecords); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch latch = new CountDownLatch(numRecords); + AtomicInteger acceptCount = new AtomicInteger(); + AtomicInteger rejectCount = new AtomicInteger(); + Map threadCounts = new ConcurrentHashMap<>(); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + String threadName = Thread.currentThread().getName(); + threadCounts.computeIfAbsent(threadName, k -> new AtomicInteger()).incrementAndGet(); + + // Reject every 5th record, accept others + int recordNum = Integer.parseInt(record.value().substring(5)); // "value0" -> 0 + if (recordNum % 5 == 0) { + acknowledgment.reject(); + rejectCount.incrementAndGet(); + } + else { + acknowledgment.acknowledge(); + acceptCount.incrementAndGet(); + } + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyExplicitTest"); + container.setConcurrency(concurrency); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)) + .as("All records should be processed with explicit acknowledgment") + .isTrue(); + + logger.info(() -> "Thread distribution with explicit ack: " + threadCounts); + logger.info(() -> "Accept count: " + acceptCount.get() + ", Reject count: " + rejectCount.get()); + + // Verify at least one thread processed records + assertThat(threadCounts.size()) + .as("At least one thread should process records in explicit mode") + .isGreaterThanOrEqualTo(1) + .isLessThanOrEqualTo(concurrency); + + // Verify acknowledgments were processed correctly + assertThat(acceptCount.get() + rejectCount.get()) + .as("Total acknowledgments should equal number of records") + .isEqualTo(numRecords); + + assertThat(rejectCount.get()) + .as("Expected number of rejections") + .isEqualTo(3); // Records 0, 5, 10 + } + finally { + container.stop(); + } + } + + @Test + void shouldStopAllConsumerThreadsGracefully(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-lifecycle-test"; + String groupId = "share-container-concurrency-lifecycle-group"; + String bootstrapServers = broker.getBrokersAsString(); + + broker.addTopics(topic); + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + int concurrency = 5; + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + AtomicInteger processedCount = new AtomicInteger(); + + containerProps.setMessageListener((MessageListener) record -> { + processedCount.incrementAndGet(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyLifecycleTest"); + container.setConcurrency(concurrency); + + // Verify initial state + assertThat(container.isRunning()).isFalse(); + assertThat(container.metrics()).isEmpty(); + + // Start container + container.start(); + assertThat(container.isRunning()).isTrue(); + + // Wait for consumers to initialize + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(container.metrics()).hasSize(concurrency)); + + // Produce some records + produceTestRecords(bootstrapServers, topic, 10); + + // Give some time for processing + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(0)); + + int processedBeforeStop = processedCount.get(); + logger.info(() -> "Processed " + processedBeforeStop + " records before stop"); + + // Stop the container + container.stop(); + assertThat(container.isRunning()).isFalse(); + + // Verify metrics are cleared after stop + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(container.metrics()).isEmpty()); + + // Verify container can be restarted + container.start(); + assertThat(container.isRunning()).isTrue(); + + // Verify consumers are recreated + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(container.metrics()).hasSize(concurrency)); + + // Produce more records + produceTestRecords(bootstrapServers, topic, 5); + + // Verify processing continues after restart + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(processedBeforeStop)); + + logger.info(() -> "Processed " + processedCount.get() + " records total after restart"); + + // Final stop + container.stop(); + assertThat(container.isRunning()).isFalse(); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java index ff1f14104d..3786a5d963 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java @@ -89,6 +89,37 @@ void shouldFailWhenExplicitModeUsedWithNonAcknowledgingListener() { .withMessageContaining("Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener"); } + @Test + void shouldSetConcurrencyCorrectly() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getConcurrency()).isEqualTo(1); // Default is 1 + + container.setConcurrency(5); + assertThat(container.getConcurrency()).isEqualTo(5); + } + + @Test + void shouldRejectInvalidConcurrency() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> container.setConcurrency(0)) + .withMessageContaining("concurrency must be greater than 0"); + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> container.setConcurrency(-1)) + .withMessageContaining("concurrency must be greater than 0"); + } + @Test void shouldValidateListenerTypeOnStartup() { // Given: A container with explicit acknowledgment mode and proper listener From 3348103660186cb96212341f4090dbeb2650fc66 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 7 Oct 2025 16:57:11 -0400 Subject: [PATCH 2/2] Address PR feedback on concurrency implementation - Use primitive int for concurrency in factory (consistent with phase field) - Remove unnecessary `getConcurrency()` getter (only used in trivial tests) - Use `HashMap` instead of `ConcurrentHashMap` in metrics() (already inside lock) - Use `CompletableFuture.allOf()` for cleaner shutdown coordination - Remove debug logging from tests (unnecessary noise in CI/CD) - Remove thread tracking from concurrency tests (over-complicates assertions) Clarify documentation based on KIP-932 specifications: - Add explicit note that concurrency is additive across application instances - Replace high-level distribution description with precise KIP-932 details - Document pull-based model, acquisition locks, and batch behavior - Explain `max.poll.records` as soft limit with complete batch preference - Set accurate expectations about broker-controlled record distribution Signed-off-by: Soby Chacko --- .../ROOT/pages/kafka/kafka-queues.adoc | 47 +++++++++++++++---- .../ShareKafkaListenerContainerFactory.java | 7 ++- .../ShareKafkaMessageListenerContainer.java | 23 +++------ ...sageListenerContainerIntegrationTests.java | 45 ------------------ ...afkaMessageListenerContainerUnitTests.java | 14 ------ 5 files changed, 46 insertions(+), 90 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index 2aeceefd50..07fec9f949 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -160,6 +160,23 @@ Each thread runs its own `ShareConsumer` instance that participates in the same Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker. This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances. +[IMPORTANT] +==== +**Concurrency is Additive Across Application Instances** + +From the share group's perspective, each `ShareConsumer` instance is an independent member, regardless of where it runs. +Setting `concurrency=3` in a single container creates 3 share group members. +If you run multiple application instances with the same share group ID, all their consumer threads combine into one pool. + +For example: +* Application Instance 1: `concurrency=3` → 3 share group members +* Application Instance 2: `concurrency=3` → 3 share group members +* **Total**: 6 share group members available for the broker to distribute records to + +This means setting `concurrency=5` in a single container is operationally equivalent to running 5 separate application instances with `concurrency=1` each (all using the same `group.id`). +The Kafka broker treats all consumer instances equally and distributes records across the entire pool. +==== + ==== Configuring Concurrency Programmatically [source,java] @@ -269,21 +286,31 @@ public void processOrder(ConsumerRecord record, ShareAcknowledgm [NOTE] ==== -**Work Distribution Behavior:** +**Record Acquisition and Distribution Behavior:** + +Share consumers use a pull-based model where each consumer thread calls `poll()` to fetch records from the broker. +When a consumer polls, the broker's share-partition leader: + +* Selects records in "Available" state +* Moves them to "Acquired" state with a time-limited acquisition lock (default 30 seconds, configurable via `group.share.record.lock.duration.ms`) +* Prefers to return complete record batches for efficiency +* Applies `max.poll.records` as a soft limit, meaning complete record batches will be acquired even if it exceeds this value -With share consumers, record distribution is controlled by Kafka's share group coordinator at the broker level, not by Spring for Apache Kafka. -The broker may assign all records to a single consumer thread at any given time, especially when: +While records are acquired by one consumer, they are not available to other consumers. +When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer. -* The topic has a single partition -* There's low message volume -* The broker's distribution algorithm favors certain consumers +The broker limits the number of records that can be acquired per partition using `group.share.partition.max.record.locks`. +Once this limit is reached, subsequent polls temporarily return no records until locks expire. -This is normal behavior. The key benefit of concurrency is having multiple consumer threads *available* to the share group coordinator for distribution. -As message volume increases or over time, you should see distribution across multiple threads. +**Implications for Concurrency:** -This differs from traditional `ConcurrentMessageListenerContainer` where Spring explicitly distributes partitions across threads. +* Each consumer thread independently polls and may acquire different numbers of records per poll +* Record distribution across threads depends on polling timing and batch availability +* Multiple threads increase the pool of consumers available to acquire records +* With low message volume or single partitions, records may concentrate on fewer threads +* For long-running workloads, distribution tends to be more even -When using concurrency with share consumers: +**Configuration:** * Each thread polls and processes records independently * Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index 7e46187c48..10a36fdc94 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode; -import org.jspecify.annotations.Nullable; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -64,7 +63,7 @@ public class ShareKafkaListenerContainerFactory private int phase = 0; - private @Nullable Integer concurrency; + private int concurrency = 1; @SuppressWarnings("NullAway.Init") private ApplicationEventPublisher applicationEventPublisher; @@ -113,7 +112,7 @@ public void setPhase(int phase) { * attribute on {@code @KafkaListener}. * @param concurrency the number of consumer threads (must be greater than 0) */ - public void setConcurrency(Integer concurrency) { + public void setConcurrency(int concurrency) { this.concurrency = concurrency; } @@ -162,7 +161,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst if (conc != null) { instance.setConcurrency(conc); } - else if (this.concurrency != null) { + else { instance.setConcurrency(this.concurrency); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index b4d041c449..b70677cc30 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -134,14 +135,6 @@ public void setClientId(String clientId) { this.clientId = clientId; } - /** - * Get the concurrency level (number of consumer threads). - * @return the concurrency level - */ - public int getConcurrency() { - return this.concurrency; - } - /** * Set the level of concurrency. This will create the specified number of * consumer threads, each with its own {@link ShareConsumer} instance. @@ -169,7 +162,7 @@ public boolean isInExpectedState() { if (this.consumers.isEmpty()) { return Collections.emptyMap(); } - Map> allMetrics = new ConcurrentHashMap<>(); + Map> allMetrics = new HashMap<>(); for (ShareListenerConsumer consumer : this.consumers) { Map consumerMetrics = consumer.consumer.metrics(); String consumerId = consumer.getClientId(); @@ -239,17 +232,13 @@ protected void doStop() { // Wait for all consumer threads to complete this.lifecycleLock.lock(); try { - for (CompletableFuture future : this.consumerFutures) { - try { - future.join(); // Wait for consumer to finish - } - catch (Exception e) { - this.logger.error(e, "Error waiting for consumer thread to stop"); - } - } + CompletableFuture.allOf(this.consumerFutures.toArray(new CompletableFuture[0])).join(); this.consumers.clear(); this.consumerFutures.clear(); } + catch (Exception e) { + this.logger.error(e, "Error waiting for consumer threads to stop"); + } finally { this.lifecycleLock.unlock(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java index 9ad2224efa..4114b155c2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; @@ -84,8 +83,6 @@ ) class ShareKafkaMessageListenerContainerIntegrationTests { - private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(ShareKafkaMessageListenerContainerIntegrationTests.class)); - @Test void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broker) throws Exception { final String topic = "share-listener-integration-test"; @@ -856,12 +853,9 @@ void shouldProcessRecordsWithMultipleConsumerThreads(EmbeddedKafkaBroker broker) ContainerProperties containerProps = new ContainerProperties(topic); CountDownLatch latch = new CountDownLatch(numRecords); - Map threadCounts = new ConcurrentHashMap<>(); List receivedValues = Collections.synchronizedList(new ArrayList<>()); containerProps.setMessageListener((MessageListener) record -> { - String threadName = Thread.currentThread().getName(); - threadCounts.computeIfAbsent(threadName, k -> new AtomicInteger()).incrementAndGet(); receivedValues.add(record.value()); latch.countDown(); }); @@ -871,34 +865,14 @@ void shouldProcessRecordsWithMultipleConsumerThreads(EmbeddedKafkaBroker broker) container.setBeanName("concurrencyBasicTest"); container.setConcurrency(concurrency); - assertThat(container.getConcurrency()).isEqualTo(concurrency); - container.start(); try { - // Verify all records are processed assertThat(latch.await(30, TimeUnit.SECONDS)) .as("All %d records should be processed", numRecords) .isTrue(); - // Log thread distribution for debugging - logger.info(() -> "Thread distribution: " + threadCounts); - - // Verify all records received assertThat(receivedValues).hasSize(numRecords); - - // Share consumers with single partition may use only one consumer at a time - // So we verify: at least 1 thread used, at most concurrency threads - assertThat(threadCounts.size()) - .as("At least one consumer thread should process records") - .isGreaterThanOrEqualTo(1) - .isLessThanOrEqualTo(concurrency); - - // Verify work was done - int totalProcessed = threadCounts.values().stream() - .mapToInt(AtomicInteger::get) - .sum(); - assertThat(totalProcessed).isEqualTo(numRecords); } finally { container.stop(); @@ -953,8 +927,6 @@ void shouldAggregateMetricsFromMultipleConsumers(EmbeddedKafkaBroker broker) thr .as("Client ID should contain bean name") .contains("concurrencyMetricsTest"); } - - logger.info(() -> "Client IDs from metrics: " + metrics.keySet()); } finally { container.stop(); @@ -983,13 +955,9 @@ void shouldHandleConcurrencyWithExplicitAcknowledgment(EmbeddedKafkaBroker broke CountDownLatch latch = new CountDownLatch(numRecords); AtomicInteger acceptCount = new AtomicInteger(); AtomicInteger rejectCount = new AtomicInteger(); - Map threadCounts = new ConcurrentHashMap<>(); containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( record, acknowledgment, consumer) -> { - String threadName = Thread.currentThread().getName(); - threadCounts.computeIfAbsent(threadName, k -> new AtomicInteger()).incrementAndGet(); - // Reject every 5th record, accept others int recordNum = Integer.parseInt(record.value().substring(5)); // "value0" -> 0 if (recordNum % 5 == 0) { @@ -1014,16 +982,6 @@ void shouldHandleConcurrencyWithExplicitAcknowledgment(EmbeddedKafkaBroker broke .as("All records should be processed with explicit acknowledgment") .isTrue(); - logger.info(() -> "Thread distribution with explicit ack: " + threadCounts); - logger.info(() -> "Accept count: " + acceptCount.get() + ", Reject count: " + rejectCount.get()); - - // Verify at least one thread processed records - assertThat(threadCounts.size()) - .as("At least one thread should process records in explicit mode") - .isGreaterThanOrEqualTo(1) - .isLessThanOrEqualTo(concurrency); - - // Verify acknowledgments were processed correctly assertThat(acceptCount.get() + rejectCount.get()) .as("Total acknowledgments should equal number of records") .isEqualTo(numRecords); @@ -1084,7 +1042,6 @@ void shouldStopAllConsumerThreadsGracefully(EmbeddedKafkaBroker broker) throws E .untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(0)); int processedBeforeStop = processedCount.get(); - logger.info(() -> "Processed " + processedBeforeStop + " records before stop"); // Stop the container container.stop(); @@ -1112,8 +1069,6 @@ void shouldStopAllConsumerThreadsGracefully(EmbeddedKafkaBroker broker) throws E .atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(processedBeforeStop)); - logger.info(() -> "Processed " + processedCount.get() + " records total after restart"); - // Final stop container.stop(); assertThat(container.isRunning()).isFalse(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java index 3786a5d963..7dad5cb0c4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java @@ -89,20 +89,6 @@ void shouldFailWhenExplicitModeUsedWithNonAcknowledgingListener() { .withMessageContaining("Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener"); } - @Test - void shouldSetConcurrencyCorrectly() { - ContainerProperties containerProperties = new ContainerProperties("test-topic"); - containerProperties.setMessageListener(messageListener); - - ShareKafkaMessageListenerContainer container = - new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); - - assertThat(container.getConcurrency()).isEqualTo(1); // Default is 1 - - container.setConcurrency(5); - assertThat(container.getConcurrency()).isEqualTo(5); - } - @Test void shouldRejectInvalidConcurrency() { ContainerProperties containerProperties = new ContainerProperties("test-topic");