Skip to content

Commit

Permalink
Retry offset commit (#3105)
Browse files Browse the repository at this point in the history
Before this patch, we were caching the new offset before committing
to Kafka the offset, causing the offset to not be stored until
a new record is successfully processed.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
knative-prow-robot and pierDipi committed May 26, 2023
1 parent 44c60b1 commit e3e58ea
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ private void commit(final KafkaConsumerRecord<?, ?> record) {
private synchronized Future<Void> commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
long newOffset = tracker.offsetToCommit();
if (newOffset > tracker.getCommitted()) {
// Reset the state
tracker.setCommitted(newOffset);

logger.debug("Committing offset for {} offset {}",
keyValue("topicPartition", topicPartition),
Expand All @@ -165,6 +163,9 @@ private synchronized Future<Void> commit(final TopicPartition topicPartition, fi
// Execute the actual commit
return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
.onSuccess(ignored -> {
// Reset the state
tracker.setCommitted(newOffset);

if (onCommit != null) {
onCommit.accept((int) newOffset);
}
Expand Down Expand Up @@ -219,7 +220,7 @@ public Future<Void> close() {
* This offset tracker keeps track of the committed records for a
* single partition.
*/
private static class OffsetTracker {
static class OffsetTracker {

/*
* We use a BitSet as a replacement for an array of booleans.
Expand Down Expand Up @@ -314,4 +315,9 @@ private static void logPartitions(final String context,
final Set<TopicPartition> tps) {
logger.info("Partitions " + context + " {}", keyValue("partitions", tps));
}

/* Visible for testing */
Map<TopicPartition, OffsetTracker> getOffsetTrackers() {
return offsetTrackers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import io.cloudevents.CloudEvent;
import io.micrometer.core.instrument.Counter;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Execution(value = ExecutionMode.CONCURRENT)
@ExtendWith(VertxExtension.class)
Expand Down Expand Up @@ -315,4 +324,58 @@ public void failedToSendToDeadLetterSink(final Vertx vertx) {
shouldNeverPause(consumer);
verify(eventsSentCounter, never()).increment();
}

@Test
@SuppressWarnings("unchecked")
public void testFailedCommitRetry(final Vertx vertx) {
final var counter = new AtomicInteger(0);
final Counter eventsSentCounter = mock(Counter.class);

final KafkaConsumer<String, CloudEvent> consumer = mock(KafkaConsumer.class);
when(consumer.commit((Map<io.vertx.kafka.client.common.TopicPartition, OffsetAndMetadata>) any())).then(invocationOnMock -> {
if (counter.incrementAndGet() == 1) {
return Future.failedFuture(new RuntimeException());
}
return Future.succeededFuture();
});

final var r = record("aaa", 0, 0);

OffsetManager strategy = new OffsetManager(vertx, consumer, eventsSentCounter::increment, 100L);
strategy.recordReceived(r);
strategy.successfullySentToSubscriber(r);

final var offset = strategy.
getOffsetTrackers().
get(new io.vertx.kafka.client.common.TopicPartition(r.topic(), r.partition()));

await()
.timeout(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(offset.getCommitted()).isEqualTo(1));
}

@Test
@SuppressWarnings("unchecked")
public void testAlwaysFailedCommit(final Vertx vertx) {
final var counter = new AtomicInteger(0);
final Counter eventsSentCounter = mock(Counter.class);

final KafkaConsumer<String, CloudEvent> consumer = mock(KafkaConsumer.class);
when(consumer.commit((Map<io.vertx.kafka.client.common.TopicPartition, OffsetAndMetadata>) any()))
.then(invocationOnMock -> Future.failedFuture(new RuntimeException()));

final var r = record("aaa", 0, 0);

OffsetManager strategy = new OffsetManager(vertx, consumer, eventsSentCounter::increment, 100L);
strategy.recordReceived(r);
strategy.successfullySentToSubscriber(r);

final var offset = strategy.
getOffsetTrackers().
get(new io.vertx.kafka.client.common.TopicPartition(r.topic(), r.partition()));

await()
.timeout(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(offset.getCommitted()).isEqualTo(0));
}
}

0 comments on commit e3e58ea

Please sign in to comment.