diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index 7f59a727fb..07fec9f949 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -107,7 +107,7 @@ factory.addListener(new ShareConsumerFactory.Listener() { [[share-kafka-message-listener-container]] === ShareKafkaMessageListenerContainer -The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers: +The `ShareKafkaMessageListenerContainer` provides a container for share consumers with support for concurrent processing: [source,java] ---- @@ -151,6 +151,172 @@ Share consumers do not support: * Manual offset management ==== +[[share-container-concurrency]] +=== Concurrency + +The `ShareKafkaMessageListenerContainer` supports concurrent processing by creating multiple consumer threads within a single container. +Each thread runs its own `ShareConsumer` instance that participates in the same share group. + +Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker. +This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances. + +[IMPORTANT] +==== +**Concurrency is Additive Across Application Instances** + +From the share group's perspective, each `ShareConsumer` instance is an independent member, regardless of where it runs. +Setting `concurrency=3` in a single container creates 3 share group members. +If you run multiple application instances with the same share group ID, all their consumer threads combine into one pool. + +For example: +* Application Instance 1: `concurrency=3` → 3 share group members +* Application Instance 2: `concurrency=3` → 3 share group members +* **Total**: 6 share group members available for the broker to distribute records to + +This means setting `concurrency=5` in a single container is operationally equivalent to running 5 separate application instances with `concurrency=1` each (all using the same `group.id`). +The Kafka broker treats all consumer instances equally and distributes records across the entire pool. +==== + +==== Configuring Concurrency Programmatically + +[source,java] +---- +@Bean +public ShareKafkaMessageListenerContainer concurrentContainer( + ShareConsumerFactory shareConsumerFactory) { + + ContainerProperties containerProps = new ContainerProperties("my-topic"); + containerProps.setGroupId("my-share-group"); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps); + + // Set concurrency to create 5 consumer threads + container.setConcurrency(5); + + container.setupMessageListener(new MessageListener() { + @Override + public void onMessage(ConsumerRecord record) { + System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value()); + } + }); + + return container; +} +---- + +==== Configuring Concurrency via Factory + +You can set default concurrency at the factory level, which applies to all containers created by that factory: + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + + // Set default concurrency for all containers created by this factory + factory.setConcurrency(3); + + return factory; +} +---- + +==== Per-Listener Concurrency + +The concurrency setting can be overridden per listener using the `concurrency` attribute: + +[source,java] +---- +@Component +public class ConcurrentShareListener { + + @KafkaListener( + topics = "high-throughput-topic", + containerFactory = "shareKafkaListenerContainerFactory", + groupId = "my-share-group", + concurrency = "10" // Override factory default + ) + public void listen(ConsumerRecord record) { + // This listener will use 10 consumer threads + System.out.println("Processing: " + record.value()); + } +} +---- + +==== Concurrency Considerations + +* **Thread Safety**: Each consumer thread has its own `ShareConsumer` instance and manages its own acknowledgments independently +* **Client IDs**: Each consumer thread receives a unique client ID with a numeric suffix (e.g., `myContainer-0`, `myContainer-1`, etc.) +* **Metrics**: Metrics from all consumer threads are aggregated and accessible via `container.metrics()` +* **Lifecycle**: All consumer threads start and stop together as a unit +* **Work Distribution**: The Kafka broker handles record distribution across all consumer instances in the share group +* **Explicit Acknowledgment**: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don't block other threads + +==== Concurrency with Explicit Acknowledgment + +Concurrency works seamlessly with explicit acknowledgment mode. +Each consumer thread independently tracks and acknowledges its own records: + +[source,java] +---- +@KafkaListener( + topics = "order-queue", + containerFactory = "explicitShareKafkaListenerContainerFactory", + groupId = "order-processors", + concurrency = "5" +) +public void processOrder(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + try { + // Process the order + processOrderLogic(record.value()); + acknowledgment.acknowledge(); // ACCEPT + } + catch (RetryableException e) { + acknowledgment.release(); // Will be redelivered + } + catch (Exception e) { + acknowledgment.reject(); // Permanent failure + } +} +---- + +[NOTE] +==== +**Record Acquisition and Distribution Behavior:** + +Share consumers use a pull-based model where each consumer thread calls `poll()` to fetch records from the broker. +When a consumer polls, the broker's share-partition leader: + +* Selects records in "Available" state +* Moves them to "Acquired" state with a time-limited acquisition lock (default 30 seconds, configurable via `group.share.record.lock.duration.ms`) +* Prefers to return complete record batches for efficiency +* Applies `max.poll.records` as a soft limit, meaning complete record batches will be acquired even if it exceeds this value + +While records are acquired by one consumer, they are not available to other consumers. +When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer. + +The broker limits the number of records that can be acquired per partition using `group.share.partition.max.record.locks`. +Once this limit is reached, subsequent polls temporarily return no records until locks expire. + +**Implications for Concurrency:** + +* Each consumer thread independently polls and may acquire different numbers of records per poll +* Record distribution across threads depends on polling timing and batch availability +* Multiple threads increase the pool of consumers available to acquire records +* With low message volume or single partitions, records may concentrate on fewer threads +* For long-running workloads, distribution tends to be more even + +**Configuration:** + +* Each thread polls and processes records independently +* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads) +* Concurrency setting must be greater than 0 and cannot be changed while the container is running +==== + [[share-annotation-driven-listeners]] == Annotation-Driven Listeners @@ -520,8 +686,7 @@ Share consumers differ from regular consumers in several key ways: === Current Limitations * **In preview**: This feature is in preview mode and may change in future versions -* **Single-Threaded**: Share consumer containers currently run in single-threaded mode * **No Message Converters**: Message converters are not yet supported for share consumers * **No Batch Listeners**: Batch processing is not supported with share consumers -* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls +* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index cac3cd2c66..10a36fdc94 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -63,6 +63,8 @@ public class ShareKafkaListenerContainerFactory private int phase = 0; + private int concurrency = 1; + @SuppressWarnings("NullAway.Init") private ApplicationEventPublisher applicationEventPublisher; @@ -98,6 +100,22 @@ public void setPhase(int phase) { this.phase = phase; } + /** + * Set the concurrency for containers created by this factory. + *

+ * This specifies the number of consumer threads to create within each container. + * Each thread creates its own {@link org.apache.kafka.clients.consumer.ShareConsumer} + * instance and participates in the same share group. The Kafka broker distributes + * records across all consumer instances, providing record-level load balancing. + *

+ * This can be overridden per listener endpoint using the {@code concurrency} + * attribute on {@code @KafkaListener}. + * @param concurrency the number of consumer threads (must be greater than 0) + */ + public void setConcurrency(int concurrency) { + this.concurrency = concurrency; + } + @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; @@ -138,6 +156,15 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst boolean explicitAck = determineExplicitAcknowledgment(properties); properties.setExplicitShareAcknowledgment(explicitAck); + // Set concurrency - endpoint setting takes precedence over factory setting + Integer conc = endpoint.getConcurrency(); + if (conc != null) { + instance.setConcurrency(conc); + } + else { + instance.setConcurrency(this.concurrency); + } + instance.setAutoStartup(effectiveAutoStartup); instance.setPhase(this.phase); instance.setApplicationContext(this.applicationContext); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index e4216a102a..b70677cc30 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -16,8 +16,11 @@ package org.springframework.kafka.listener; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -45,19 +48,29 @@ import org.springframework.util.Assert; /** - * Single-threaded share consumer container using the Java {@link ShareConsumer}. + * Share consumer container using the Java {@link ShareConsumer}. *

* This container provides support for Kafka share groups, enabling cooperative * consumption where multiple consumers can process records from the same partitions. * Unlike traditional consumer groups with exclusive partition assignment, share groups * allow load balancing at the record level. *

+ * Concurrency Support: + *

+ * This container supports running multiple consumer threads via the {@link #setConcurrency(int)} + * method. Each thread creates its own {@link ShareConsumer} instance and polls independently. + * Unlike traditional consumer groups where concurrency involves partition distribution, + * share consumers leverage Kafka's record-level distribution across all group members. + * This means multiple threads in the same container participate in the same share group, + * with the broker distributing records across all consumer instances. + *

* Key features: *

    *
  • Explicit and implicit acknowledgment modes
  • *
  • Automatic error handling with REJECT acknowledgments
  • *
  • Poll-level acknowledgment constraints in explicit mode
  • *
  • Integration with Spring's {@code @KafkaListener} annotation
  • + *
  • Configurable concurrency for increased throughput
  • *
*

* Acknowledgment Modes: @@ -86,11 +99,11 @@ public class ShareKafkaMessageListenerContainer @Nullable private String clientId; - @SuppressWarnings("NullAway.Init") - private volatile ShareListenerConsumer listenerConsumer; + private int concurrency = 1; + + private final List consumers = new ArrayList<>(); - @SuppressWarnings("NullAway.Init") - private volatile CompletableFuture listenerConsumerFuture; + private final List> consumerFutures = new ArrayList<>(); private volatile CountDownLatch startLatch = new CountDownLatch(1); @@ -122,6 +135,21 @@ public void setClientId(String clientId) { this.clientId = clientId; } + /** + * Set the level of concurrency. This will create the specified number of + * consumer threads, each with its own {@link ShareConsumer} instance. + * All consumers participate in the same share group, leveraging Kafka's + * record-level distribution for load balancing. + *

+ * Must be called before the container is started. + * @param concurrency the concurrency level (must be greater than 0) + */ + public void setConcurrency(int concurrency) { + Assert.isTrue(concurrency > 0, "concurrency must be greater than 0"); + Assert.state(!isRunning(), "Cannot change concurrency while container is running"); + this.concurrency = concurrency; + } + @Override public boolean isInExpectedState() { return isRunning(); @@ -129,12 +157,24 @@ public boolean isInExpectedState() { @Override public Map> metrics() { - ShareListenerConsumer listenerConsumerForMetrics = this.listenerConsumer; - if (listenerConsumerForMetrics != null) { - Map metrics = listenerConsumerForMetrics.consumer.metrics(); - return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics); + this.lifecycleLock.lock(); + try { + if (this.consumers.isEmpty()) { + return Collections.emptyMap(); + } + Map> allMetrics = new HashMap<>(); + for (ShareListenerConsumer consumer : this.consumers) { + Map consumerMetrics = consumer.consumer.metrics(); + String consumerId = consumer.getClientId(); + if (consumerId != null) { + allMetrics.put(consumerId, consumerMetrics); + } + } + return Collections.unmodifiableMap(allMetrics); + } + finally { + this.lifecycleLock.unlock(); } - return Collections.emptyMap(); } @Override @@ -161,15 +201,47 @@ protected void doStart() { "Either use implicit acknowledgment mode or provide a listener that can handle acknowledgments."); } - this.listenerConsumer = new ShareListenerConsumer(listener); setRunning(true); - this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor); + + // Create multiple consumer threads based on concurrency setting + for (int i = 0; i < this.concurrency; i++) { + String consumerClientId = determineClientId(i); + ShareListenerConsumer consumer = new ShareListenerConsumer(listener, consumerClientId); + this.consumers.add(consumer); + CompletableFuture future = CompletableFuture.runAsync(consumer, consumerExecutor); + this.consumerFutures.add(future); + } + } + + /** + * Determine the client ID for a consumer thread. + * @param index the consumer index + * @return the client ID to use + */ + private String determineClientId(int index) { + String baseClientId = this.clientId != null ? this.clientId : getBeanName(); + if (this.concurrency > 1) { + return baseClientId + "-" + index; + } + return baseClientId; } @Override protected void doStop() { setRunning(false); - // The consumer will exit its loop naturally when running becomes false. + // Wait for all consumer threads to complete + this.lifecycleLock.lock(); + try { + CompletableFuture.allOf(this.consumerFutures.toArray(new CompletableFuture[0])).join(); + this.consumers.clear(); + this.consumerFutures.clear(); + } + catch (Exception e) { + this.logger.error(e, "Error waiting for consumer threads to stop"); + } + finally { + this.lifecycleLock.unlock(); + } } private void publishConsumerStartingEvent() { @@ -233,13 +305,13 @@ private class ShareListenerConsumer implements Runnable { private final long ackTimeoutMs; - ShareListenerConsumer(GenericMessageListener listener) { + ShareListenerConsumer(GenericMessageListener listener, String consumerClientId) { this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( ShareKafkaMessageListenerContainer.this.getGroupId(), - ShareKafkaMessageListenerContainer.this.getClientId()); + consumerClientId); this.genericListener = listener; - this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); + this.clientId = consumerClientId; ContainerProperties containerProperties = getContainerProperties(); // Configure acknowledgment mode @@ -509,7 +581,7 @@ private void acknowledgeInternal(AcknowledgeType type) { } // Queue the acknowledgment to be processed on the consumer thread - ShareKafkaMessageListenerContainer.this.listenerConsumer.acknowledgmentQueue.offer( + ShareListenerConsumer.this.acknowledgmentQueue.offer( new PendingAcknowledgment<>(this.record, type)); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java index eac63b415c..4114b155c2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -265,11 +265,12 @@ void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker) container.setBeanName("constraintTestContainer"); container.start(); - LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", + // Access the first consumer from the consumers list + LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "consumers[0].logger", LogAccessor.class)); DirectFieldAccessor accessor = new DirectFieldAccessor(container); - accessor.setPropertyValue("listenerConsumer.logger", logAccessor); + accessor.setPropertyValue("consumers[0].logger", logAccessor); try { // Wait for first batch to be processed @@ -343,11 +344,12 @@ void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) thro container.setBeanName("partialAckTestContainer"); container.start(); - LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", + // Access the first consumer from the consumers list + LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "consumers[0].logger", LogAccessor.class)); DirectFieldAccessor accessor = new DirectFieldAccessor(container); - accessor.setPropertyValue("listenerConsumer.logger", logAccessor); + accessor.setPropertyValue("consumers[0].logger", logAccessor); produceTestRecords(bootstrapServers, topic, 4); @@ -828,4 +830,248 @@ private static AcknowledgeType getAcknowledgmentTypeInternal(ShareAcknowledgment } } + // ==================== Concurrency Integration Tests ==================== + + @Test + void shouldProcessRecordsWithMultipleConsumerThreads(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-basic-test"; + String groupId = "share-container-concurrency-basic-group"; + String bootstrapServers = broker.getBrokersAsString(); + + // Create topic with multiple partitions to allow better distribution + broker.addTopics(topic); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + // Produce more records than consumers to ensure distribution + int numRecords = 30; + int concurrency = 3; + produceTestRecords(bootstrapServers, topic, numRecords); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(numRecords); + List receivedValues = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener((MessageListener) record -> { + receivedValues.add(record.value()); + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyBasicTest"); + container.setConcurrency(concurrency); + + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)) + .as("All %d records should be processed", numRecords) + .isTrue(); + + assertThat(receivedValues).hasSize(numRecords); + } + finally { + container.stop(); + } + } + + @Test + void shouldAggregateMetricsFromMultipleConsumers(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-metrics-test"; + String groupId = "share-container-concurrency-metrics-group"; + String bootstrapServers = broker.getBrokersAsString(); + + broker.addTopics(topic); + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + int concurrency = 4; + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setMessageListener((MessageListener) record -> { + // Simple consumer + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyMetricsTest"); + container.setConcurrency(concurrency); + container.start(); + + try { + // Wait for all consumers to initialize and register metrics + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> { + var metrics = container.metrics(); + assertThat(metrics) + .as("Metrics should be available from all %d consumers", concurrency) + .hasSize(concurrency); + }); + + var metrics = container.metrics(); + + // Verify each consumer has a unique client ID + assertThat(metrics.keySet()) + .as("Each consumer should have unique client ID") + .hasSize(concurrency); + + // Verify client IDs follow the pattern: beanName-0, beanName-1, etc. + for (String clientId : metrics.keySet()) { + assertThat(clientId) + .as("Client ID should contain bean name") + .contains("concurrencyMetricsTest"); + } + } + finally { + container.stop(); + } + } + + @Test + void shouldHandleConcurrencyWithExplicitAcknowledgment(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-explicit-test"; + String groupId = "share-container-concurrency-explicit-group"; + String bootstrapServers = broker.getBrokersAsString(); + + broker.addTopics(topic); + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + int numRecords = 15; + int concurrency = 3; + produceTestRecords(bootstrapServers, topic, numRecords); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setExplicitShareAcknowledgment(true); + + CountDownLatch latch = new CountDownLatch(numRecords); + AtomicInteger acceptCount = new AtomicInteger(); + AtomicInteger rejectCount = new AtomicInteger(); + + containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener) ( + record, acknowledgment, consumer) -> { + // Reject every 5th record, accept others + int recordNum = Integer.parseInt(record.value().substring(5)); // "value0" -> 0 + if (recordNum % 5 == 0) { + acknowledgment.reject(); + rejectCount.incrementAndGet(); + } + else { + acknowledgment.acknowledge(); + acceptCount.incrementAndGet(); + } + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyExplicitTest"); + container.setConcurrency(concurrency); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)) + .as("All records should be processed with explicit acknowledgment") + .isTrue(); + + assertThat(acceptCount.get() + rejectCount.get()) + .as("Total acknowledgments should equal number of records") + .isEqualTo(numRecords); + + assertThat(rejectCount.get()) + .as("Expected number of rejections") + .isEqualTo(3); // Records 0, 5, 10 + } + finally { + container.stop(); + } + } + + @Test + void shouldStopAllConsumerThreadsGracefully(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-concurrency-lifecycle-test"; + String groupId = "share-container-concurrency-lifecycle-group"; + String bootstrapServers = broker.getBrokersAsString(); + + broker.addTopics(topic); + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + int concurrency = 5; + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + AtomicInteger processedCount = new AtomicInteger(); + + containerProps.setMessageListener((MessageListener) record -> { + processedCount.incrementAndGet(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("concurrencyLifecycleTest"); + container.setConcurrency(concurrency); + + // Verify initial state + assertThat(container.isRunning()).isFalse(); + assertThat(container.metrics()).isEmpty(); + + // Start container + container.start(); + assertThat(container.isRunning()).isTrue(); + + // Wait for consumers to initialize + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(container.metrics()).hasSize(concurrency)); + + // Produce some records + produceTestRecords(bootstrapServers, topic, 10); + + // Give some time for processing + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(0)); + + int processedBeforeStop = processedCount.get(); + + // Stop the container + container.stop(); + assertThat(container.isRunning()).isFalse(); + + // Verify metrics are cleared after stop + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(container.metrics()).isEmpty()); + + // Verify container can be restarted + container.start(); + assertThat(container.isRunning()).isTrue(); + + // Verify consumers are recreated + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(container.metrics()).hasSize(concurrency)); + + // Produce more records + produceTestRecords(bootstrapServers, topic, 5); + + // Verify processing continues after restart + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(processedBeforeStop)); + + // Final stop + container.stop(); + assertThat(container.isRunning()).isFalse(); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java index ff1f14104d..7dad5cb0c4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java @@ -89,6 +89,23 @@ void shouldFailWhenExplicitModeUsedWithNonAcknowledgingListener() { .withMessageContaining("Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener"); } + @Test + void shouldRejectInvalidConcurrency() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> container.setConcurrency(0)) + .withMessageContaining("concurrency must be greater than 0"); + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> container.setConcurrency(-1)) + .withMessageContaining("concurrency must be greater than 0"); + } + @Test void shouldValidateListenerTypeOnStartup() { // Given: A container with explicit acknowledgment mode and proper listener