Skip to content

Commit

Permalink
Improve CommitAsync Retries Test
Browse files Browse the repository at this point in the history
Assert that retries are exhausted and commit callback is called.
  • Loading branch information
garyrussell committed Jan 20, 2022
1 parent 51c4850 commit a63ad4c
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3409,6 +3409,10 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
CountDownLatch latch = new CountDownLatch(4);
CountDownLatch retriesExhausted = new CountDownLatch(1);
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
new TopicPartitionOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
if (sync) {
willAnswer(i -> {
latch.countDown();
Expand All @@ -3422,10 +3426,10 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
latch.countDown();
return null;
}).given(consumer).commitAsync(anyMap(), any());
containerProps.setCommitCallback((offsets, exception) -> {
retriesExhausted.countDown();
});
}
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
new TopicPartitionOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
containerProps.setSyncCommits(sync);
containerProps.setGroupId("grp");
containerProps.setClientId("clientId");
Expand All @@ -3443,6 +3447,7 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
}
else {
verify(consumer, times(4)).commitAsync(any(), any());
assertThat(retriesExhausted.await(10, TimeUnit.SECONDS)).isTrue();
}
}

Expand Down

0 comments on commit a63ad4c

Please sign in to comment.