Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in recoverer with DefaultAfterRollbackProcessor still commit the offset #1259

Closed
Rajh opened this issue Oct 3, 2019 · 0 comments · Fixed by #1261 or #1262
Closed

Exception in recoverer with DefaultAfterRollbackProcessor still commit the offset #1259

Rajh opened this issue Oct 3, 2019 · 0 comments · Fixed by #1261 or #1262

Comments

@Rajh
Copy link

Rajh commented Oct 3, 2019

Hi,
I'm using version 2.2.9.RELEASE.

Issue:
When an exception occur in a custom recoverer for DefaultAfterRollbackProcessor, the transaction seems to not be rolled back and thus the offset is commit.

Reproduction:
Use transactional kafka listener which throws an exception.
Use DefaultAfterRollbackProcessor with a customer recoverer as follow :
((consumerRecord, e) -> { throw new RuntimeException("Error in recover"); });

AfterRollbackProcessor:

@Bean
    public AfterRollbackProcessor<?, ?> afterRollbackProcessor(KafkaTemplate<Object, Object> template, BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
        // Send to DLQ on multiple errors
        DefaultAfterRollbackProcessor defaultRollbackProcessor = new DefaultAfterRollbackProcessor(recoverer, 1);
        // Commit offset when error has been rollback and recovered to a DLQ
        defaultRollbackProcessor.setProcessInTransaction(true);
        defaultRollbackProcessor.setKafkaTemplate(template);
        return defaultRollbackProcessor;
    }

The record is not reprocessed even with a restart (without any new messages after the error)

Logs:
2019-10-03 16:39:42.683 ERROR 20993 --- [ntainer#3-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
2019-10-03 16:39:42.686 ERROR 20993 --- [ntainer#3-0-C-1] o.s.k.l.DefaultAfterRollbackProcessor : Recoverer threw exception

java.lang.RuntimeException: Error in recover

@garyrussell garyrussell modified the milestones: 2.3.1, 2.2.10 Oct 3, 2019
@garyrussell garyrussell self-assigned this Oct 3, 2019
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 3, 2019
Resolves spring-projects#1259

Previously if the recoverer in a `SeekToCurrentErrorHandler` or
`DefaultAfterRollbackProcessor` failed to recover a record, the
record could be lost; the `FailedRecordTracker` simply logged
the exception.

Change the `SeekUtils` to detect a failure in the recoverer (actually
any failure when determining if the failed record should be recovered)
and include the failed record in the seeks.

In this way the recovery will be attempted once more on each delivery
attempt.

**cherry-pick to 2.2.x**
artembilan pushed a commit that referenced this issue Oct 3, 2019
Resolves #1259

Previously if the recoverer in a `SeekToCurrentErrorHandler` or
`DefaultAfterRollbackProcessor` failed to recover a record, the
record could be lost; the `FailedRecordTracker` simply logged
the exception.

Change the `SeekUtils` to detect a failure in the recoverer (actually
any failure when determining if the failed record should be recovered)
and include the failed record in the seeks.

In this way the recovery will be attempted once more on each delivery
attempt.

**cherry-pick to 2.2.x**
artembilan pushed a commit that referenced this issue Oct 3, 2019
Resolves #1259

Previously if the recoverer in a `SeekToCurrentErrorHandler` or
`DefaultAfterRollbackProcessor` failed to recover a record, the
record could be lost; the `FailedRecordTracker` simply logged
the exception.

Change the `SeekUtils` to detect a failure in the recoverer (actually
any failure when determining if the failed record should be recovered)
and include the failed record in the seeks.

In this way the recovery will be attempted once more on each delivery
attempt.

**cherry-pick to 2.2.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java
#	spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java
#	src/reference/asciidoc/kafka.adoc
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 3, 2019
Resolves spring-projects#1259

When a record fails with an exception that is not classified for retry,
handle a failed recovery and re-seek the failed record.

`FailedRecordTracker.skip()` is not used for unclassified exceptions,
it's recoverer is called directly.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 3, 2019
Resolves spring-projects#1259

When a record fails with an exception that is not classified for retry,
handle a failed recovery and re-seek the failed record.

`FailedRecordTracker.skip()` is not used for unclassified exceptions,
it's recoverer is called directly.
artembilan pushed a commit that referenced this issue Oct 3, 2019
Resolves #1259

When a record fails with an exception that is not classified for retry,
handle a failed recovery and re-seek the failed record.

`FailedRecordTracker.skip()` is not used for unclassified exceptions,
it's recoverer is called directly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment