-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add concurrency support to ShareKafkaMessageListenerContainer
#4102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Share consumers (KIP-932) enable record-level load balancing where multiple consumers can cooperatively process from the same partitions. Unlike traditional consumer groups with exclusive partition ownership, share groups distribute work at the broker level via the share group coordinator. This commit adds native concurrency support to the existing `ShareKafkaMessageListenerContainer` rather than creating a separate `ConcurrentShareKafkaMessageListenerContainer`. This design choice avoids the parent/child container complexity that exists in the regular consumer model, since share consumers fundamentally operate differently: - Work distribution happens at the broker level, not at the Spring layer - Multiple threads simply provide more capacity for the broker to distribute records across - No partition ownership model to coordinate between child containers This approach provides: - Simpler architecture with a single container managing multiple threads - No parent/child context propagation concerns - Better alignment with share consumer semantics (record-level vs partition-level distribution) - Increased throughput for high-volume workloads - Better resource utilization across consumer threads Users can configure concurrency at three levels: 1. Per-listener via `@KafkaListener(concurrency = N)` 2. Factory-level default via `factory.setConcurrency(N)` 3. Programmatically via `container.setConcurrency(N)` The feature works seamlessly with both implicit (auto-acknowledge) and explicit (manual acknowledge/release/reject) acknowledgment modes, with each consumer thread independently managing its own acknowledgments. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Just couple nit-picks and questions to clarify.
Thanks
spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc
Show resolved
Hide resolved
* There's low message volume | ||
* The broker's distribution algorithm favors certain consumers | ||
This is normal behavior. The key benefit of concurrency is having multiple consumer threads *available* to the share group coordinator for distribution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One sentence per line.
==== | ||
**Work Distribution Behavior:** | ||
With share consumers, record distribution is controlled by Kafka's share group coordinator at the broker level, not by Spring for Apache Kafka. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that it works same way as in many other message broker implementations for queues: the next record is pushed down to the next available consumer?
Or is that done in batches?
Where do we control that batch size then? min.bytes
? max.record
, max.wait
?
Thanks
* attribute on {@code @KafkaListener}. | ||
* @param concurrency the number of consumer threads (must be greater than 0) | ||
*/ | ||
public void setConcurrency(Integer concurrency) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be a primitive.
* Get the concurrency level (number of consumer threads). | ||
* @return the concurrency level | ||
*/ | ||
public int getConcurrency() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need getter?
// Wait for all consumer threads to complete | ||
this.lifecycleLock.lock(); | ||
try { | ||
for (CompletableFuture<Void> future : this.consumerFutures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See CompletableFuture.allOf()
instead.
) | ||
class ShareKafkaMessageListenerContainerIntegrationTests { | ||
|
||
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(ShareKafkaMessageListenerContainerIntegrationTests.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LogAccessor
has a ctor based on the class:
/**
* Create a new accessor for the specified Commons Log category.
* @see LogFactory#getLog(Class)
*/
public LogAccessor(Class<?> logCategory) {
container.setBeanName("concurrencyBasicTest"); | ||
container.setConcurrency(concurrency); | ||
|
||
assertThat(container.getConcurrency()).isEqualTo(concurrency); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something very obvious and totally redundant.
.isTrue(); | ||
|
||
// Log thread distribution for debugging | ||
logger.info(() -> "Thread distribution: " + threadCounts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need logger in this test class just for these infos?
How is this useful for the whole test suite, esspecialy running on CI/CD?
int totalProcessed = threadCounts.values().stream() | ||
.mapToInt(AtomicInteger::get) | ||
.sum(); | ||
assertThat(totalProcessed).isEqualTo(numRecords); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks totally equal to the assertThat(receivedValues).hasSize(numRecords);
.
Since we really don't care what thread has processed those records, there is just no need to track them.
Why over-complicate tests?
- Use primitive int for concurrency in factory (consistent with phase field) - Remove unnecessary `getConcurrency()` getter (only used in trivial tests) - Use `HashMap` instead of `ConcurrentHashMap` in metrics() (already inside lock) - Use `CompletableFuture.allOf()` for cleaner shutdown coordination - Remove debug logging from tests (unnecessary noise in CI/CD) - Remove thread tracking from concurrency tests (over-complicates assertions) Clarify documentation based on KIP-932 specifications: - Add explicit note that concurrency is additive across application instances - Replace high-level distribution description with precise KIP-932 details - Document pull-based model, acquisition locks, and batch behavior - Explain `max.poll.records` as soft limit with complete batch preference - Set accurate expectations about broker-controlled record distribution Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
Share consumers (KIP-932) enable record-level load balancing where
multiple consumers can cooperatively process from the same partitions.
Unlike traditional consumer groups with exclusive partition ownership,
share groups distribute work at the broker level via the share group
coordinator.
This commit adds native concurrency support to the existing
ShareKafkaMessageListenerContainer
rather than creating a separateConcurrentShareKafkaMessageListenerContainer
. This design choiceavoids the parent/child container complexity that exists in the regular
consumer model, since share consumers fundamentally operate differently:
This approach provides:
Users can configure concurrency at three levels:
@KafkaListener(concurrency = N)
factory.setConcurrency(N)
container.setConcurrency(N)
The feature works seamlessly with both implicit (auto-acknowledge) and
explicit (manual acknowledge/release/reject) acknowledgment modes, with
each consumer thread independently managing its own acknowledgments.