Skip to content

Commit

Permalink
GH-3186: Filter out successful commits from retry
Browse files Browse the repository at this point in the history
Fixes: #3186

* Filter out successful commits from the retry
* Don't retry failed commits due to rebalance if successful commits subsequently supersede them
* Modify rebalance test cases to capture the scenario that triggers the bug

 **Auto-cherry-pick to `3.1.x` & `3.0.x`**
  • Loading branch information
mikael-carlstedt authored and sobychacko committed Apr 11, 2024
1 parent 54190df commit 6a0bab6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@
* @author Daniel Gentes
* @author Soby Chacko
* @author Raphael Rösch
* @author Christian Mergenthaler
* @author Mikael Carlstedt
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -3318,6 +3320,10 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int re
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
if (!this.commitsDuringRebalance.isEmpty()) {
// Remove failed commits during last rebalance that are superseded by these commits
this.commitsDuringRebalance.keySet().removeAll(commits.keySet());
}
}
catch (RetriableCommitFailedException e) {
if (retries >= this.containerProperties.getCommitRetries()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@
* @author Lukasz Kaminski
* @author Ray Chuan Tay
* @author Daniel Gentes
* @author Soby Chacko
* @author Wang Zhiyang
* @author Mikael Carlstedt
*/
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
Expand Down Expand Up @@ -3526,25 +3529,33 @@ public void testCooperativeRebalance() throws Exception {

@Test
void testCommitRebalanceInProgressBatch() throws Exception {
testCommitRebalanceInProgressGuts(AckMode.BATCH, 2, commits -> {
assertThat(commits).hasSize(3);
testCommitRebalanceInProgressGuts(AckMode.BATCH, 3, commits -> {
assertThat(commits).hasSize(5);
assertThat(commits.get(0)).hasSize(2); // assignment
assertThat(commits.get(1)).hasSize(2); // batch commit
assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
assertThat(commits.get(1)).hasSize(2); // batch commit which should fail due to rebalance in progress
assertThat(commits.get(2)).hasSize(2); // commit retry which should fail due to rebalance in progress
assertThat(commits.get(3)).hasSize(1); // GH-3186: additional batch commit with only one partition which should be successful
assertThat(commits.get(4)).hasSize(1); // GH-2489: offsets for both uncommitted partition should be re-committed before partition 0 is revoked
assertThat(commits.get(4).get(new TopicPartition("foo", 0)))
.isNotNull()
.extracting(OffsetAndMetadata::offset)
.isEqualTo(2L);
});
}

@Test
void testCommitRebalanceInProgressRecord() throws Exception {
testCommitRebalanceInProgressGuts(AckMode.RECORD, 5, commits -> {
assertThat(commits).hasSize(6);
testCommitRebalanceInProgressGuts(AckMode.RECORD, 6, commits -> {
assertThat(commits).hasSize(8);
assertThat(commits.get(0)).hasSize(2); // assignment
assertThat(commits.get(1)).hasSize(1); // 4 individual commits
assertThat(commits.get(1)).hasSize(1); // 4 individual commits which should fail due to rebalance in progress
assertThat(commits.get(2)).hasSize(1);
assertThat(commits.get(3)).hasSize(1);
assertThat(commits.get(4)).hasSize(1);
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
assertThat(commits.get(5)).hasSize(2); // commit retry which should fail due to rebalance in progress
assertThat(commits.get(6)).hasSize(1); // GH-3186: additional commit which should be successful
assertThat(commits.get(7)).hasSize(1); // GH-2489: offsets for both partition should be re-committed before partition 0 is revoked
assertThat(commits.get(7).get(new TopicPartition("foo", 0)))
.isNotNull()
.extracting(om -> om.offset())
.isEqualTo(2L);
Expand All @@ -3568,25 +3579,37 @@ private void testCommitRebalanceInProgressGuts(AckMode ackMode, int exceptions,
records.put(new TopicPartition("foo", 1), Arrays.asList(
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> additionalRecords = Collections.singletonMap(
new TopicPartition("foo", 1),
Collections.singletonList(new ConsumerRecord<>("foo", 1, 2L, 1, "foo")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
ConsumerRecords<Integer, String> additionalConsumerRecords = new ConsumerRecords<>(additionalRecords);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
AtomicBoolean first = new AtomicBoolean(true);
AtomicInteger rebalance = new AtomicInteger();
AtomicInteger pollIteration = new AtomicInteger();
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(2);
CountDownLatch latch = new CountDownLatch(3);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
int call = rebalance.getAndIncrement();
int call = pollIteration.getAndIncrement();
final ConsumerRecords<Integer, String> result;
if (call == 0) {
rebal.get().onPartitionsRevoked(Collections.emptyList());
rebal.get().onPartitionsAssigned(records.keySet());
result = consumerRecords;
}
else if (call == 1) {
result = additionalConsumerRecords;
}
else if (call == 2) {
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
rebal.get().onPartitionsAssigned(Collections.emptyList());
result = emptyRecords;
}
else {
result = emptyRecords;
}
latch.countDown();
return first.getAndSet(false) ? consumerRecords : emptyRecords;
return result;
});
willAnswer(invoc -> {
rebal.set(invoc.getArgument(1));
Expand Down

0 comments on commit 6a0bab6

Please sign in to comment.