Skip to content

Commit

Permalink
Restore STCEH Behavior After Previous Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jun 23, 2021
1 parent bb3d3df commit 924b8a8
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 6 deletions.
Expand Up @@ -109,10 +109,8 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

if (records != null) {
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
}
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
}

}
Expand Up @@ -185,7 +185,7 @@ public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<
* @param level the log level for the thrown exception after handling.
* @since 2.7
*/
public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<?, ?>> records,
public static void seekOrRecover(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered,
RecoveryStrategy recovery, LogAccessor logger, Level level) {

Expand All @@ -202,7 +202,7 @@ public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<
}
}

if (!doSeeks(records, consumer, thrownException, true, recovery, container, logger)) {
if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) {
throw new KafkaException("Seek to current after exception", level, thrownException);
}
if (commitRecovered) {
Expand Down

0 comments on commit 924b8a8

Please sign in to comment.