Skip to content

Commit

Permalink
(refact) seperate progress from abstract commit policy
Browse files Browse the repository at this point in the history
  • Loading branch information
ylgrgyq committed Mar 9, 2020
1 parent 9541107 commit 53d06ee
Show file tree
Hide file tree
Showing 25 changed files with 628 additions and 609 deletions.
146 changes: 10 additions & 136 deletions src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package cn.leancloud.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;

import java.time.Duration;
import java.util.*;

import static java.util.stream.Collectors.toSet;
import static java.util.Collections.emptySet;

abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
abstract class AbstractCommitPolicy<K, V> implements CommitPolicy {
static SleepFunction sleepFunction = Thread::sleep;

interface SleepFunction {
Expand Down Expand Up @@ -45,116 +44,35 @@ void onError(RetriableException e) {
}

protected final Consumer<K, V> consumer;
private final Map<TopicPartition, Long> topicOffsetHighWaterMark;
private final Map<TopicPartition, CompletedOffsets> completedOffsets;
private final long syncCommitRetryIntervalMs;
private final int maxAttemptsForEachSyncCommit;

AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
this.consumer = consumer;
this.topicOffsetHighWaterMark = new HashMap<>();
this.completedOffsets = new HashMap<>();
this.syncCommitRetryIntervalMs = syncCommitRetryInterval.toMillis();
this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
}

@Override
public void markPendingRecord(ConsumerRecord<K, V> record) {
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
topicOffsetHighWaterMark.merge(
topicPartition,
record.offset() + 1,
Math::max);

final CompletedOffsets offset = completedOffsets.get(topicPartition);
// please note that if offset exists, it could happen for record.offset() >= offset.nextOffsetToCommit()
// when there're duplicate records which have lower offset than our next offset to commit consumed from broker
if (offset == null) {
completedOffsets.put(topicPartition, new CompletedOffsets(record.offset() - 1L));
}
}

@Override
public void markCompletedRecord(ConsumerRecord<K, V> record) {
final CompletedOffsets offset = completedOffsets.get(new TopicPartition(record.topic(), record.partition()));
// offset could be null, when the partition of the record was revoked before its processing was done
if (offset != null) {
offset.addCompleteOffset(record.offset());
}
}

@Override
public void revokePartitions(Collection<TopicPartition> partitions) {
clearProcessingRecordStatesFor(partitions);
}

@Override
public Set<TopicPartition> partialCommitSync() {
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = completedTopicOffsetsToCommit();
public Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress) {
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = progress.completedOffsetsToCommit();
if (offsetsToCommit.isEmpty()) {
return Collections.emptySet();
return emptySet();
}
commitSyncWithRetry(offsetsToCommit);
updatePartialCommittedOffsets(offsetsToCommit);
progress.updateCommittedOffsets(offsetsToCommit);

return clearProcessingRecordStatesForCompletedPartitions(offsetsToCommit);
return progress.clearCompletedPartitions(offsetsToCommit);
}

Set<TopicPartition> fullCommitSync() {
Set<TopicPartition> fullCommitSync(ProcessRecordsProgress progress) {
commitSyncWithRetry();

final Set<TopicPartition> completePartitions = partitionsForAllRecordsStates();
clearAllProcessingRecordStates();
final Set<TopicPartition> completePartitions = progress.allPartitions();
progress.clearAll();
return completePartitions;
}

@VisibleForTesting
Map<TopicPartition, Long> topicOffsetHighWaterMark() {
return topicOffsetHighWaterMark;
}

Map<TopicPartition, OffsetAndMetadata> completedTopicOffsetsToCommit() {
if (noCompletedOffsets()) {
return Collections.emptyMap();
}

final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (Map.Entry<TopicPartition, CompletedOffsets> entry : completedOffsets.entrySet()) {
final CompletedOffsets offset = entry.getValue();
if (offset.hasOffsetToCommit()) {
offsets.put(entry.getKey(), offset.getOffsetToCommit());
}
}

return offsets;
}

boolean noTopicOffsetsToCommit() {
if (noCompletedOffsets()) {
return true;
}

for (Map.Entry<TopicPartition, CompletedOffsets> entry : completedOffsets.entrySet()) {
final CompletedOffsets offset = entry.getValue();
if (offset.hasOffsetToCommit()) {
return false;
}
}

return true;
}

void updatePartialCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final CompletedOffsets offset = completedOffsets.get(entry.getKey());
offset.updateCommittedOffset(entry.getValue().offset());
}
}

boolean noCompletedOffsets() {
return completedOffsets.isEmpty();
}

void commitSyncWithRetry() {
final RetryContext context = context();
do {
Expand All @@ -179,50 +97,6 @@ void commitSyncWithRetry(Map<TopicPartition, OffsetAndMetadata> offsets) {
} while (true);
}

Set<TopicPartition> partitionsForAllRecordsStates() {
return new HashSet<>(topicOffsetHighWaterMark.keySet());
}

void clearAllProcessingRecordStates() {
topicOffsetHighWaterMark.clear();
completedOffsets.clear();
}

Set<TopicPartition> clearProcessingRecordStatesForCompletedPartitions(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
final Set<TopicPartition> partitions = partitionsToSafeResume(committedOffsets);
clearProcessingRecordStatesFor(partitions);
return partitions;
}

void clearProcessingRecordStatesFor(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
topicOffsetHighWaterMark.remove(p);
completedOffsets.remove(p);
}
}

Set<TopicPartition> partitionsToSafeResume() {
return partitionsToSafeResume(completedTopicOffsetsToCommit());
}

Set<TopicPartition> partitionsToSafeResume(Map<TopicPartition, OffsetAndMetadata> completedOffsets) {
return completedOffsets
.entrySet()
.stream()
.filter(entry -> topicOffsetMeetHighWaterMark(entry.getKey(), entry.getValue()))
.map(Map.Entry::getKey)
.collect(toSet());
}

private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
final Long offsetHighWaterMark = topicOffsetHighWaterMark.get(topicPartition);
if (offsetHighWaterMark != null) {
return offset.offset() >= offsetHighWaterMark;
}
// maybe this partition revoked before a msg of this partition was processed
return true;
}

private RetryContext context() {
return new RetryContext(syncCommitRetryIntervalMs, maxAttemptsForEachSyncCommit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPol
}

@Override
public final Set<TopicPartition> tryCommit(boolean noPendingRecords) {
public final Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress) {
if (needRecommit()) {
commitSyncWithRetry(offsetsForRecommit());
updateNextRecommitTime();
}
return tryCommit0(noPendingRecords);
return tryCommit0(noPendingRecords, progress);
}

abstract Set<TopicPartition> tryCommit0(boolean noPendingRecords);
abstract Set<TopicPartition> tryCommit0(boolean noPendingRecords, ProcessRecordsProgress progress);

void updateNextRecommitTime() {
updateNextRecommitTime(System.nanoTime());
Expand Down
42 changes: 16 additions & 26 deletions src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package cn.leancloud.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptySet;
Expand All @@ -17,7 +14,6 @@ final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K,
private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);

private final int maxPendingAsyncCommits;
private final OffsetCommitCallback callback;
private int pendingAsyncCommitCounter;
private boolean forceSync;

Expand All @@ -28,19 +24,18 @@ final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K,
int maxPendingAsyncCommits) {
super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, recommitInterval);
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
this.callback = new AsyncCommitCallback();
}

@Override
Set<TopicPartition> tryCommit0(boolean noPendingRecords) {
Set<TopicPartition> tryCommit0(boolean noPendingRecords, ProcessRecordsProgress progress) {
// with forceSync mark it means a previous async commit was failed, so
// we do a sync commit no matter if there's any pending records or completed offsets
if (!forceSync && (!noPendingRecords || noTopicOffsetsToCommit())) {
if (!forceSync && (!noPendingRecords || progress.noOffsetsToCommit())) {
return emptySet();
}

final Set<TopicPartition> partitions = partitionsForAllRecordsStates();
commit();
final Set<TopicPartition> partitions = progress.allPartitions();
commit(progress);

// for our commit policy, no matter syncCommit or asyncCommit we are using, we always
// commit all assigned offsets, so we can update recommit time here safely. And
Expand All @@ -60,29 +55,24 @@ boolean forceSync() {
return forceSync;
}

private void commit() {
private void commit(ProcessRecordsProgress progress) {
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
commitSyncWithRetry();
pendingAsyncCommitCounter = 0;
forceSync = false;
clearAllProcessingRecordStates();
progress.clearAll();
} else {
++pendingAsyncCommitCounter;
consumer.commitAsync(callback);
}
}

private class AsyncCommitCallback implements OffsetCommitCallback {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
--pendingAsyncCommitCounter;
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
if (exception != null) {
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
forceSync = true;
} else {
clearProcessingRecordStatesForCompletedPartitions(offsets);
}
consumer.commitAsync(((offsets, exception) -> {
--pendingAsyncCommitCounter;
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
if (exception != null) {
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
forceSync = true;
} else {
progress.clearCompletedPartitions(offsets);
}
}));
}
}
}
12 changes: 6 additions & 6 deletions src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ final class AutoCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
}

@Override
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
if (noTopicOffsetsToCommit()) {
public Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress) {
if (progress.noOffsetsToCommit()) {
return emptySet();
}

final Set<TopicPartition> partitions;
if (noPendingRecords) {
partitions = partitionsForAllRecordsStates();
clearAllProcessingRecordStates();
partitions = progress.allPartitions();
progress.clearAll();
} else {
partitions = partitionsToSafeResume();
clearProcessingRecordStatesFor(partitions);
partitions = progress.completedPartitions();
progress.clearFor(partitions);
}

return partitions;
Expand Down
31 changes: 3 additions & 28 deletions src/main/java/cn/leancloud/kafka/consumer/CommitPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,7 @@
import java.util.Collection;
import java.util.Set;

interface CommitPolicy<K, V> {
/**
* Mark an {@link ConsumerRecord} as pending before processing it. So {@link CommitPolicy} can know which and
* how many records we need to process. It is called by {@link Fetcher} when {@code Fetcher} fetched any
* {@link ConsumerRecord}s from Broker.
*
* @param record the {@link ConsumerRecord} need to process
*/
void markPendingRecord(ConsumerRecord<K, V> record);

/**
* Mark an {@link ConsumerRecord} as completed after processing it. So {@link CommitPolicy} can know which and
* how many records we have processed. It is called by {@link Fetcher} when {@code Fetcher} make sure that
* a {@code ConsumerRecord} was processed successfully.
*
* @param record the {@link ConsumerRecord} processed
*/
void markCompletedRecord(ConsumerRecord<K, V> record);

interface CommitPolicy {
/**
* Try commit offset for any {@link TopicPartition}s which has processed {@link ConsumerRecord}s based on the
* intrinsic policy of this {@link CommitPolicy}. This method is called whenever there're any
Expand All @@ -35,7 +17,7 @@ interface CommitPolicy<K, V> {
* calculate this value much quicker
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
*/
Set<TopicPartition> tryCommit(boolean noPendingRecords);
Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress);

/**
* Do a dedicated partition commit synchronously which only commit those {@link ConsumerRecord}s that have
Expand All @@ -44,12 +26,5 @@ interface CommitPolicy<K, V> {
*
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
*/
Set<TopicPartition> partialCommitSync();

/**
* Revoke internal states for some partitions.
*
* @param partitions which was revoked from consumer
*/
void revokePartitions(Collection<TopicPartition> partitions);
Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress);
}
Loading

0 comments on commit 53d06ee

Please sign in to comment.