Skip to content

Commit

Permalink
GH-2076: Fix Async Commit Retries
Browse files Browse the repository at this point in the history
#2076

Do not attempt to retry asynchronous commits.
- a later commit for the same topic/partition may have already succeeded
- many consecutive retryable exceptions can cause a stack overflow

**cherry-pick to 2.8.x, 2.7.x**

* Remove unused parameter; polish javadocs.
  • Loading branch information
garyrussell committed Jan 27, 2022
1 parent fe0f58e commit 8169f4d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -293,6 +293,7 @@ public OffsetCommitCallback getCommitCallback() {
* @see #setSyncCommitTimeout(Duration)
* @see #setCommitCallback(OffsetCommitCallback)
* @see #setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level)
* @see #setCommitRetries(int)
*/
public void setSyncCommits(boolean syncCommits) {
this.syncCommits = syncCommits;
Expand Down Expand Up @@ -408,9 +409,10 @@ public void setAuthExceptionRetryInterval(Duration authExceptionRetryInterval) {
/**
* The number of retries allowed when a
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
* by the consumer.
* by the consumer when using {@link #setSyncCommits(boolean)} set to true.
* @return the number of retries.
* @since 2.3.9
* @see #setSyncCommits(boolean)
*/
public int getCommitRetries() {
return this.commitRetries;
Expand All @@ -419,9 +421,11 @@ public int getCommitRetries() {
/**
* Set number of retries allowed when a
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
* by the consumer. Default 3 (4 attempts total).
* by the consumer when using {@link #setSyncCommits(boolean)} set to true. Default 3
* (4 attempts total).
* @param commitRetries the commitRetries.
* @since 2.3.9
* @see #setSyncCommits(boolean)
*/
public void setCommitRetries(int commitRetries) {
this.commitRetries = commitRetries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ private void fixTxOffsetsIfNeeded() {
commitSync(toFix);
}
else {
commitAsync(toFix, 0);
commitAsync(toFix);
}
}
else {
Expand Down Expand Up @@ -1912,7 +1912,7 @@ else if (this.syncCommits) {
commitSync(commits);
}
else {
commitAsync(commits, 0);
commitAsync(commits);
}
}

Expand All @@ -1931,18 +1931,14 @@ else if (this.syncCommits) {
commitSync(commits);
}
else {
commitAsync(commits, 0);
commitAsync(commits);
}
}

private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits) {
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
if (exception instanceof RetriableCommitFailedException
&& retries < this.containerProperties.getCommitRetries()) {
commitAsync(commits, retries + 1);
}
else {
this.commitCallback.onComplete(offsetsAttempted, exception);
this.commitCallback.onComplete(offsetsAttempted, exception);
if (exception == null) {
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
Expand Down Expand Up @@ -2701,7 +2697,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
commitSync(offsetsToCommit);
}
else {
commitAsync(offsetsToCommit, 0);
commitAsync(offsetsToCommit);
}
}
else {
Expand Down Expand Up @@ -2963,7 +2959,7 @@ private void commitIfNecessary() {
commitSync(commits);
}
else {
commitAsync(commits, 0);
commitAsync(commits);
}
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
Expand Down Expand Up @@ -3360,7 +3356,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
}
}
else {
commitAsync(offsetsToCommit, 0);
commitAsync(offsetsToCommit);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -3380,17 +3379,8 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
}

@Test
void testCommitSyncRetries() throws Exception {
testCommitRetriesGuts(true);
}

@Test
void testCommitAsyncRetries() throws Exception {
testCommitRetriesGuts(false);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private void testCommitRetriesGuts(boolean sync) throws Exception {
void testCommitSyncRetries() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
Expand All @@ -3409,28 +3399,14 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
CountDownLatch latch = new CountDownLatch(4);
CountDownLatch retriesExhausted = new CountDownLatch(1);
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
new TopicPartitionOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
if (sync) {
willAnswer(i -> {
latch.countDown();
throw new RetriableCommitFailedException("");
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
}
else {
willAnswer(i -> {
OffsetCommitCallback callback = i.getArgument(1);
callback.onComplete(i.getArgument(0), new RetriableCommitFailedException(""));
latch.countDown();
return null;
}).given(consumer).commitAsync(anyMap(), any());
containerProps.setCommitCallback((offsets, exception) -> {
retriesExhausted.countDown();
});
}
containerProps.setSyncCommits(sync);
willAnswer(i -> {
latch.countDown();
throw new RetriableCommitFailedException("");
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
containerProps.setSyncCommits(true);
containerProps.setGroupId("grp");
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
Expand All @@ -3442,13 +3418,7 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
if (sync) {
verify(consumer, times(4)).commitSync(any(), any());
}
else {
verify(consumer, times(4)).commitAsync(any(), any());
assertThat(retriesExhausted.await(10, TimeUnit.SECONDS)).isTrue();
}
verify(consumer, times(4)).commitSync(any(), any());
}

@Test
Expand Down

0 comments on commit 8169f4d

Please sign in to comment.