Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to commit offsets when publishing to dead letter topic. #990

Closed
miklesw opened this issue Mar 8, 2019 · 5 comments
Closed

Option to commit offsets when publishing to dead letter topic. #990

miklesw opened this issue Mar 8, 2019 · 5 comments

Comments

@miklesw
Copy link

miklesw commented Mar 8, 2019

Affects Version(s): latest

When max retries is reached, the DefaultAfterRollbackProcessor uses seek to skip the failing record. This means that the record will be processed when the app is restarted.

To work around this I extended the DeadLetterPublishingRecoverer to also commit the offset.

Alternatively the DefaultAfterRollbackProcessor could be configured to commit the offsets after rolling back the main transaction.

https://stackoverflow.com/questions/55023117/kafkatemplate-in-transactional-kafkalistener-publishes-messages-when-transaction

@Transactional(KAFKA_TRANSACTION_MANAGER)
class OffsetCommittingAndDeadLetterPublishingRecoverer(val template: KafkaTemplate<Any, Any>) :
    DeadLetterPublishingRecoverer(template) {

    override fun accept(record: ConsumerRecord<*, *>, exception: Exception) {
        super.accept(record, exception)

        val topicPartition = TopicPartition(record.topic(), record.partition())
        val offsetAndMetadata = OffsetAndMetadata(record.offset() +1)

        template.sendOffsetsToTransaction(
                mapOf(topicPartition to offsetAndMetadata)
            )
        }
}
@garyrussell garyrussell added this to the 2.2.5 milestone Mar 8, 2019
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 8, 2019
Resolves spring-projects#990

Provide a mechanism to start a new transaction within which to invoke
the processor, so if it recovers the failed record, its offset can
be sent to the transaction.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 8, 2019
Resolves spring-projects#990

Provide a mechanism to start a new transaction within which to invoke
the processor, so if it recovers the failed record, its offset can
be sent to the transaction.

**cherry-pick to 2.2.x**
artembilan pushed a commit that referenced this issue Mar 8, 2019
Resolves #990

Provide a mechanism to start a new transaction within which to invoke
the processor, so if it recovers the failed record, its offset can
be sent to the transaction.

**cherry-pick to 2.2.x**
artembilan pushed a commit that referenced this issue Mar 8, 2019
Resolves #990

Provide a mechanism to start a new transaction within which to invoke
the processor, so if it recovers the failed record, its offset can
be sent to the transaction.

**cherry-pick to 2.2.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 11, 2019
Rename property to `commitRecovered`.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 11, 2019
Rename property to `commitRecovered`.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 11, 2019
Rename property to `commitRecovered`.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 11, 2019
Rename property to `commitRecovered`.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 11, 2019
Rename property to `commitRecovered`.
artembilan pushed a commit that referenced this issue Mar 15, 2019
Rename property to `commitRecovered`.
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Resolves spring-projects/spring-kafka#990

Provide a mechanism to start a new transaction within which to invoke
the processor, so if it recovers the failed record, its offset can
be sent to the transaction.

**cherry-pick to 2.2.x**
garyrussell added a commit that referenced this issue Dec 20, 2019
See #990

Property rename was not back-ported.
@anschnapp
Copy link

Sorry to write to this already closed ticket, if you think i should open a new one please let me know.

But my problem fits perfectly to the title of this issue. So i want to have an explicit commit after a dead letter was send. I want to avoid sending a dlq duplicate (or more if the are in a row) after a consumer restart.

I'm not sure if the solution inside of DefaultAfterRollbackProcessor works for me.
Because it seems that this solution is only for circumstances where kafka transactions are used.
Which is not the case in my scenario. I have a quite simple setup (but with manual direct offset) where i want commit after each processed entry (even on dlq handled ones).

@garyrussell
Copy link
Contributor

garyrussell commented Sep 26, 2022

Commenting on old, closed, issues is not a good idea; especially when unrelated since you are not using transactions.

The default behavior in supported versions is to commit the offset after an error handler successfully "handles" the error - see CommonErrorHandler.isAckAfterHandle().

If you are seeing different behavior, open a new issue, providing a minimal, complete, reproducible example.

@anschnapp
Copy link

Thanks for answering @garyrussell

You are right that this is not the best place.
Yes the topic is slightly different but the title was a complete fit and also most of the description (until the example appears ;))

But if i might ask you one more question:

I use the DefaultErrorHandler with an DeadLetterPublishingRecoverer inside.
I now have used defaultErrorHandler.setAckAfterHandle(true); inside my configuraiton.

I have a integration test and it seems like there is no commit / ack for a dead letter "recovered" entry.
Do you might have another clue for me, why this does not work?
(i tried to looked up inside the implementation but i didn't even find a usage for isAckAfterHandle...)

@garyrussell
Copy link
Contributor

ackAfterHandle is true by default so no need to set it.

invokeErrorHandler(record, iterator, e);
commitOffsetsIfNeeded(record);

and

private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> record) {
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
|| this.producer != null) {
if (this.isManualAck) {
this.commitRecovered = true;
}
if (this.remainingRecords == null
|| !record.equals(this.remainingRecords.iterator().next())) {
ackCurrent(record);
}
if (this.isManualAck) {
this.commitRecovered = false;
}
}
}

Bear in mind that the actual commit depends on the AckMode if the default, BATCH is used, the commit won't happen until the remaining records from the poll have been processed. To commit immediately, use AckMode.RECORD.

public void ackCurrent(final ConsumerRecord<K, V> record) {
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
createOffsetAndMetadata(record.offset() + 1));
if (this.producer == null) {
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
if (this.syncCommits) {
commitSync(offsetsToCommit);
}
else {
commitAsync(offsetsToCommit);
}
}
else {
this.acks.add(record);
}
}
else if (this.producer != null
|| ((!this.isAnyManualAck || this.commitRecovered) && !this.autoCommit)) {
this.acks.add(record);
}
if (this.producer != null) {
sendOffsetsToTransaction();
}
}

@anschnapp
Copy link

Thx @garyrussell for the detail explanation! Now it works!

The issue on my side was that i wanted to manually ack inside the listener, so i have set upped MANUAL_IMMEDIATE as the AckMode. But my own logic also just simple acks after the record was processed. So, using RECORD works perfectly fine for me (also reduced some complexity in my code) and it works with the error handler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants