diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index 07fec9f949..8b7b33b76c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -667,6 +667,96 @@ public void validateData(ConsumerRecord record, ShareAcknowledgm } ---- +[[share-poison-message-protection]] +== Poison Message Protection and Delivery Count + +KIP-932 includes broker-side poison message protection to prevent unprocessable records from being endlessly redelivered. + +=== How It Works + +Every time a record is acquired by a consumer in a share group, the broker increments an internal delivery count. +The first acquisition sets the delivery count to 1, and each subsequent acquisition increments it. +When the delivery count reaches the configured limit (default: 5), the record moves to **Archived** state and is not eligible for additional delivery attempts. + +=== Configuration + +The maximum delivery attempts can be configured per share group using the Admin API: + +[source,java] +---- +private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception { + Map adminProps = new HashMap<>(); + adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + try (Admin admin = Admin.create(adminProps)) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); + + // Default is 5, adjust based on your retry tolerance + ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10"); + + Map> configs = Map.of( + configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET)) + ); + + admin.incrementalAlterConfigs(configs).all().get(); + } +} +---- + +[IMPORTANT] +==== +**Delivery Count is Not Exposed to Applications** + +The delivery count is maintained internally by the broker and is **not exposed to consumer applications**. +This is an intentional design decision in KIP-932. +The delivery count is approximate and serves as a poison message protection mechanism, not a precise redelivery counter. +Applications cannot query or access this value through any API. + +For application-level retry logic, use the acknowledgment types: + +* `RELEASE` - Make record available for redelivery (contributes to delivery count) +* `REJECT` - Mark as permanently failed (does not cause redelivery) +* `ACCEPT` - Successfully processed (does not cause redelivery) + +The broker automatically prevents endless redelivery once `group.share.delivery.attempt.limit` is reached, moving the record to Archived state. +==== + +=== Retry Strategy Recommendations + +[source,java] +---- +@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory") +public void processOrder(ConsumerRecord record, ShareAcknowledgment ack) { + try { + // Attempt to process the order + orderService.process(record.value()); + ack.acknowledge(); // ACCEPT - successfully processed + } + catch (TransientException e) { + // Temporary failure (network issue, service unavailable, etc.) + // Release the record for redelivery + // Broker will retry up to group.share.delivery.attempt.limit times + logger.warn("Transient error processing order, will retry: {}", e.getMessage()); + ack.release(); // RELEASE - make available for retry + } + catch (ValidationException e) { + // Permanent semantic error (invalid data format, business rule violation, etc.) + // Do not retry - this record will never succeed + logger.error("Invalid order data, rejecting: {}", e.getMessage()); + ack.reject(); // REJECT - permanent failure, do not retry + } + catch (Exception e) { + // Unknown error - typically safer to reject to avoid infinite loops + // But could also release if you suspect it might be transient + logger.error("Unexpected error processing order, rejecting: {}", e.getMessage()); + ack.reject(); // REJECT - avoid poison message loops + } +} +---- + +The broker's poison message protection ensures that even if you always use `RELEASE` for errors, records won't be retried endlessly. +They will automatically be archived after reaching the delivery attempt limit. + [[share-differences-from-regular-consumers]] == Differences from Regular Consumers @@ -678,6 +768,7 @@ Share consumers differ from regular consumers in several key ways: 4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types 5. **Different Group Management**: Share groups use different coordinator protocols 6. **No Batch Processing**: Share consumers process records individually, not in batches +7. **Broker-Side Retry Management**: Delivery count tracking and poison message protection are managed by the broker, not exposed to applications [[share-limitations-and-considerations]] == Limitations and Considerations