Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
[[share-kafka-message-listener-container]]
=== ShareKafkaMessageListenerContainer

The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers:
The `ShareKafkaMessageListenerContainer` provides a container for share consumers with support for concurrent processing:

[source,java]
----
Expand Down Expand Up @@ -151,6 +151,172 @@ Share consumers do not support:
* Manual offset management
====

[[share-container-concurrency]]
=== Concurrency

The `ShareKafkaMessageListenerContainer` supports concurrent processing by creating multiple consumer threads within a single container.
Each thread runs its own `ShareConsumer` instance that participates in the same share group.

Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker.
This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.

[IMPORTANT]
====
**Concurrency is Additive Across Application Instances**

From the share group's perspective, each `ShareConsumer` instance is an independent member, regardless of where it runs.
Setting `concurrency=3` in a single container creates 3 share group members.
If you run multiple application instances with the same share group ID, all their consumer threads combine into one pool.

For example:
* Application Instance 1: `concurrency=3` → 3 share group members
* Application Instance 2: `concurrency=3` → 3 share group members
* **Total**: 6 share group members available for the broker to distribute records to

This means setting `concurrency=5` in a single container is operationally equivalent to running 5 separate application instances with `concurrency=1` each (all using the same `group.id`).
The Kafka broker treats all consumer instances equally and distributes records across the entire pool.
====

==== Configuring Concurrency Programmatically

[source,java]
----
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {

ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");

ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

// Set concurrency to create 5 consumer threads
container.setConcurrency(5);

container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});

return container;
}
----

==== Configuring Concurrency via Factory

You can set default concurrency at the factory level, which applies to all containers created by that factory:

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {

ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);

return factory;
}
----

==== Per-Listener Concurrency

The concurrency setting can be overridden per listener using the `concurrency` attribute:

[source,java]
----
@Component
public class ConcurrentShareListener {

@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
----

==== Concurrency Considerations

* **Thread Safety**: Each consumer thread has its own `ShareConsumer` instance and manages its own acknowledgments independently
* **Client IDs**: Each consumer thread receives a unique client ID with a numeric suffix (e.g., `myContainer-0`, `myContainer-1`, etc.)
* **Metrics**: Metrics from all consumer threads are aggregated and accessible via `container.metrics()`
* **Lifecycle**: All consumer threads start and stop together as a unit
* **Work Distribution**: The Kafka broker handles record distribution across all consumer instances in the share group
* **Explicit Acknowledgment**: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don't block other threads

==== Concurrency with Explicit Acknowledgment

Concurrency works seamlessly with explicit acknowledgment mode.
Each consumer thread independently tracks and acknowledges its own records:

[source,java]
----
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
----

[NOTE]
====
**Record Acquisition and Distribution Behavior:**

Share consumers use a pull-based model where each consumer thread calls `poll()` to fetch records from the broker.
When a consumer polls, the broker's share-partition leader:

* Selects records in "Available" state
* Moves them to "Acquired" state with a time-limited acquisition lock (default 30 seconds, configurable via `group.share.record.lock.duration.ms`)
* Prefers to return complete record batches for efficiency
* Applies `max.poll.records` as a soft limit, meaning complete record batches will be acquired even if it exceeds this value

While records are acquired by one consumer, they are not available to other consumers.
When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer.

The broker limits the number of records that can be acquired per partition using `group.share.partition.max.record.locks`.
Once this limit is reached, subsequent polls temporarily return no records until locks expire.

**Implications for Concurrency:**

* Each consumer thread independently polls and may acquire different numbers of records per poll
* Record distribution across threads depends on polling timing and batch availability
* Multiple threads increase the pool of consumers available to acquire records
* With low message volume or single partitions, records may concentrate on fewer threads
* For long-running workloads, distribution tends to be more even

**Configuration:**

* Each thread polls and processes records independently
* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads)
* Concurrency setting must be greater than 0 and cannot be changed while the container is running
====

[[share-annotation-driven-listeners]]
== Annotation-Driven Listeners

Expand Down Expand Up @@ -520,8 +686,7 @@ Share consumers differ from regular consumers in several key ways:
=== Current Limitations

* **In preview**: This feature is in preview mode and may change in future versions
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
* **No Message Converters**: Message converters are not yet supported for share consumers
* **No Batch Listeners**: Batch processing is not supported with share consumers
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class ShareKafkaListenerContainerFactory<K, V>

private int phase = 0;

private int concurrency = 1;

@SuppressWarnings("NullAway.Init")
private ApplicationEventPublisher applicationEventPublisher;

Expand Down Expand Up @@ -98,6 +100,22 @@ public void setPhase(int phase) {
this.phase = phase;
}

/**
* Set the concurrency for containers created by this factory.
* <p>
* This specifies the number of consumer threads to create within each container.
* Each thread creates its own {@link org.apache.kafka.clients.consumer.ShareConsumer}
* instance and participates in the same share group. The Kafka broker distributes
* records across all consumer instances, providing record-level load balancing.
* <p>
* This can be overridden per listener endpoint using the {@code concurrency}
* attribute on {@code @KafkaListener}.
* @param concurrency the number of consumer threads (must be greater than 0)
*/
public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -138,6 +156,15 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
boolean explicitAck = determineExplicitAcknowledgment(properties);
properties.setExplicitShareAcknowledgment(explicitAck);

// Set concurrency - endpoint setting takes precedence over factory setting
Integer conc = endpoint.getConcurrency();
if (conc != null) {
instance.setConcurrency(conc);
}
else {
instance.setConcurrency(this.concurrency);
}

instance.setAutoStartup(effectiveAutoStartup);
instance.setPhase(this.phase);
instance.setApplicationContext(this.applicationContext);
Expand Down
Loading