Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,96 @@ public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgm
}
----

[[share-poison-message-protection]]
== Poison Message Protection and Delivery Count

KIP-932 includes broker-side poison message protection to prevent unprocessable records from being endlessly redelivered.

=== How It Works

Every time a record is acquired by a consumer in a share group, the broker increments an internal delivery count.
The first acquisition sets the delivery count to 1, and each subsequent acquisition increments it.
When the delivery count reaches the configured limit (default: 5), the record moves to **Archived** state and is not eligible for additional delivery attempts.

=== Configuration

The maximum delivery attempts can be configured per share group using the Admin API:

[source,java]
----
private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);

// Default is 5, adjust based on your retry tolerance
ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");

Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
);

admin.incrementalAlterConfigs(configs).all().get();
}
}
----

[IMPORTANT]
====
**Delivery Count is Not Exposed to Applications**

The delivery count is maintained internally by the broker and is **not exposed to consumer applications**.
This is an intentional design decision in KIP-932.
The delivery count is approximate and serves as a poison message protection mechanism, not a precise redelivery counter.
Applications cannot query or access this value through any API.

For application-level retry logic, use the acknowledgment types:

* `RELEASE` - Make record available for redelivery (contributes to delivery count)
* `REJECT` - Mark as permanently failed (does not cause redelivery)
* `ACCEPT` - Successfully processed (does not cause redelivery)

The broker automatically prevents endless redelivery once `group.share.delivery.attempt.limit` is reached, moving the record to Archived state.
====

=== Retry Strategy Recommendations

[source,java]
----
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.attempt.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question.
When we do release for the intermittent errors, does broker ensure an order of records?
Or when we use share group consumer the order of records is of the scope and we should treat every record as independent logical entity which does not effect any others?
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using Share Groups, there's no ordering guarantee - records are distributed at the record level across any available consumer, so they can be processed out of order. When release() is called, that record goes back into the pool and could be picked up by any consumer at any time, with no guarantee about when it gets redelivered relative to other records. Each record should be treated as an independent entity that doesn't depend on the order of others. If ordering matters for a use case, traditional consumer groups are the right choice since they provide ordering guarantees within each partition.

}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
----

The broker's poison message protection ensures that even if you always use `RELEASE` for errors, records won't be retried endlessly.
They will automatically be archived after reaching the delivery attempt limit.

[[share-differences-from-regular-consumers]]
== Differences from Regular Consumers

Expand All @@ -678,6 +768,7 @@ Share consumers differ from regular consumers in several key ways:
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types
5. **Different Group Management**: Share groups use different coordinator protocols
6. **No Batch Processing**: Share consumers process records individually, not in batches
7. **Broker-Side Retry Management**: Delivery count tracking and poison message protection are managed by the broker, not exposed to applications

[[share-limitations-and-considerations]]
== Limitations and Considerations
Expand Down