Skip to content

Commit

Permalink
GH-3151: Include error handler into a listener observation
Browse files Browse the repository at this point in the history
Fixes: #3151

After fixing #3049 we are missing an `ErrorHandler` part within an observation.
This even cause a retryable topic logic ot be out of an observation scope.

* Restore the previous behavior and add `observation.error(e)` when it is not re-thrown in case of `this.commonErrorHandler` presence

(cherry picked from commit c24575c)
  • Loading branch information
artembilan authored and spring-builds committed Mar 22, 2024
1 parent fffd5ef commit bc27ffa
Showing 1 changed file with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2772,37 +2772,38 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
this.observationRegistry);
try {
observation.observe(() -> {
return observation.observe(() -> {
try {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
});
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
observation.error(e);
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
}
return null;
return null;
});
}

private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
Expand Down

0 comments on commit bc27ffa

Please sign in to comment.