Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,6 @@ public class ShareMessageListener {
}
----

[[share-group-configuration]]
== Share Group Configuration

Share groups require specific broker configuration to function properly.
For testing with embedded Kafka, use:

[source,java]
----
@EmbeddedKafka(
topics = {"my-queue-topic"},
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
"share.coordinator.state.topic.replication.factor=1",
"share.coordinator.state.topic.min.isr=1"
}
)
----

[[share-group-offset-reset]]
=== Share Group Offset Reset

Expand Down Expand Up @@ -248,8 +229,277 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws
[[share-record-acknowledgment]]
== Record Acknowledgment

Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing.
More sophisticated acknowledgment patterns will be added in future versions.
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.

[[share-implicit-acknowledgment]]
=== Implicit Acknowledgment (Default)

In implicit mode, records are automatically acknowledged based on processing outcome:

Successful processing: Records are acknowledged as `ACCEPT`
Processing errors: Records are acknowledged as `REJECT`

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Implicit mode is the default (false means implicit acknowledgment)
factory.getContainerProperties().setExplicitShareAcknowledgment(false);

return factory;
}
----

[[share-explicit-acknowledgment]]
=== Explicit Acknowledgment

In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment.

There are two ways to configure explicit acknowledgment mode:

==== Option 1: Using Kafka Client Configuration

[source,java]
----
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}
----

==== Option 2: Using Spring Container Configuration

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {

ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);

return factory;
}
----

==== Configuration Precedence

When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest):

1. **Container Properties**: `containerProperties.setExplicitShareAcknowledgment(true/false)`
2. **Consumer Config**: `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG` ("implicit" or "explicit")
3. **Default**: `false` (implicit acknowledgment)

[[share-acknowledgment-types]]
=== Acknowledgment Types

Share consumers support three acknowledgment types:

ACCEPT: Record processed successfully, mark as completed
RELEASE: Temporary failure, make record available for redelivery
REJECT: Permanent failure, do not retry

[[share-acknowledgment-api]]
=== ShareAcknowledgment API

The `ShareAcknowledgment` interface provides methods for explicit acknowledgment:

[source,java]
----
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
}
----

[[share-listener-interfaces]]
=== Listener Interfaces

Share consumers support specialized listener interfaces for different use cases:

[[share-basic-listener]]
==== Basic Message Listener

Use the standard MessageListener for simple cases:
[source,java]
----
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
----

[[share-acknowledging-listener]]
==== AcknowledgingShareConsumerAwareMessageListener

This interface provides access to the `ShareConsumer` instance with optional acknowledgment support.
The acknowledgment parameter is nullable and depends on the container's acknowledgment mode:

===== Implicit Mode Example (acknowledgment is null)

[source,java]
----
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {

// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());

// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();

// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
----

===== Explicit Mode Example (acknowledgment is non-null)

[source,java]
----
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {

// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}

private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
----

[[share-acknowledgment-constraints]]
=== Acknowledgment Constraints

In explicit acknowledgment mode, the container enforces important constraints:

Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
One-time Acknowledgment: Each record can only be acknowledged once.
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.

[WARNING]
In explicit mode, failing to acknowledge records will block further message processing.
Always ensure records are acknowledged in all code paths.

[[share-acknowledgment-timeout]]
==== Acknowledgment Timeout Detection

To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection.
When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record.

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));

return factory;
}
----

When a record exceeds the timeout, you'll see a warning like:
----
WARN: Record not acknowledged within timeout (30 seconds).
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
or ack.reject() for every record.
----

This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.

[[share-acknowledgment-examples]]
=== Acknowledgment Examples

[[share-mixed-acknowledgment-example]]
==== Mixed Acknowledgment Patterns

[source,java]
----
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
----

[[share-conditional-acknowledgment-example]]
==== Conditional Acknowledgment

[source,java]
----
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
----

[[share-differences-from-regular-consumers]]
== Differences from Regular Consumers
Expand All @@ -259,16 +509,19 @@ Share consumers differ from regular consumers in several key ways:
1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions
2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns
3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously
4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types
5. **Different Group Management**: Share groups use different coordinator protocols
6. **No Batch Processing**: Share consumers process records individually, not in batches

[[share-limitations-and-considerations]]
== Limitations and Considerations

[[share-current-limitations]]
=== Current Limitations

* **Early Access**: This feature is in early access and may change in future versions
* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported
* **No Message Converters**: Message converters are not yet supported for share consumers
* **In preview**: This feature is in preview mode and may change in future versions
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
* **No Message Converters**: Message converters are not yet supported for share consumers
* **No Batch Listeners**: Batch processing is not supported with share consumers
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.config.ShareKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
Expand Down Expand Up @@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
containerFactory, beanName);

if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
endpoint.setShareConsumer(true);
}

this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}

Expand Down Expand Up @@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
if (StringUtils.hasText(kafkaListener.batch())) {
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
}

endpoint.setBeanFactory(this.beanFactory);
resolveErrorHandler(endpoint, kafkaListener);
resolveContentTypeConverter(endpoint, kafkaListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private @Nullable Boolean batchListener;

private boolean shareConsumer;

private @Nullable KafkaTemplate<?, ?> replyTemplate;

private @Nullable String clientIdPrefix;
Expand Down Expand Up @@ -291,6 +293,19 @@ public void setBatchListener(boolean batchListener) {
this.batchListener = batchListener;
}

public void setShareConsumer(boolean shareConsumer) {
this.shareConsumer = shareConsumer;
}

/**
* Return true if this endpoint is for a share consumer.
* @return true for a share consumer endpoint.
* @since 4.0
*/
public boolean isShareConsumer() {
return this.shareConsumer;
}

/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the template.
Expand Down
Loading