Skip to content

Commit

Permalink
Fix fatal early return from onPartitionsAssigned
Browse files Browse the repository at this point in the history
Exit from the method when we encounter a fatal error.
(Early exit was lost during refactoring).
  • Loading branch information
garyrussell authored and artembilan committed May 5, 2020
1 parent 1810409 commit b51a778
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
ListenerConsumer.this.assignedPartitions.addAll(partitions);
if (ListenerConsumer.this.commitCurrentOnAssignment) {
collectAndCommitIfNecessary(partitions);
if (!collectAndCommitIfNecessary(partitions)) {
return;
}
}
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
seekPartitions(partitions, false);
Expand All @@ -2367,7 +2369,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
}

private void collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
// Commit initial positions - this is generally redundant but
// it protects us from the case when another consumer starts
// and rebalance would cause it to reset at the end
Expand All @@ -2385,12 +2387,13 @@ private void collectAndCommitIfNecessary(Collection<TopicPartition> partitions)
catch (NoOffsetForPartitionException e) {
ListenerConsumer.this.fatalError = true;
ListenerConsumer.this.logger.error(e, "No offset and no reset policy");
return;
return false;
}
}
if (offsetsToCommit.size() > 0) {
commitCurrentOffsets(offsetsToCommit);
}
return true;
}

private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
Expand Down

0 comments on commit b51a778

Please sign in to comment.