Skip to content

Commit

Permalink
[GOBBLIN-873] Add offset look-back option in Kafka consumer
Browse files Browse the repository at this point in the history
Closes apache#2721 from MeghaUpadhyay/master
  • Loading branch information
meupadhyay authored and jhsenjaliya committed Mar 24, 2020
1 parent 69920b9 commit d847d73
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
Expand Up @@ -90,7 +90,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final String LATEST_OFFSET = "latest";
public static final String EARLIEST_OFFSET = "earliest";
public static final String NEAREST_OFFSET = "nearest";
public static final String OFFSET_LOOKBACK = "offset_lookback";
public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
Expand Down Expand Up @@ -440,8 +442,14 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
offsets.startAtLatestOffset();
} else if (previousOffsetNotFound) {

// When previous offset cannot be found, either start at earliest offset or latest offset, or skip the partition
// (no need to create an empty workunit in this case since there's no offset to persist).
/**
* When previous offset cannot be found, either start at earliest offset, latest offset, go back with (latest - lookback)
* (long value to be deducted from latest offset in order to avoid data loss) or skip the partition
* (no need to create an empty workunit in this case since there's no offset to persist).
* In case of no previous state OFFSET_LOOKBACK will make sure to avoid consuming huge amount of data (earlist) and data loss (latest offset)
* lookback can be set to any long value where (latest-lookback) is nearest offset for each partition. If computed offset is out of range then
* partition will be consumed from latest offset
**/
String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET)) {
Expand All @@ -451,7 +459,44 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
LOG.warn(
offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
long latestOffset = offsets.getLatestOffset();
long offset = latestOffset - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start offset: " + offset);
try {
offsets.startAt(offset);
} catch (StartOffsetOutOfRangeException e) {
// Increment counts, which will be reported as job metrics
if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
this.offsetTooEarlyCount.incrementAndGet();
} else {
this.offsetTooLateCount.incrementAndGet();
}

// When above computed offset (latest-lookback) is out of range, either start at earliest, latest or nearest offset, or skip the
// partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
offsetOption =
state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
&& offsets.getStartOffset() >= offsets.getLatestOffset())) {
LOG.warn(
offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
else {
LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
return null;
}
Expand Down
Expand Up @@ -43,6 +43,7 @@ source.kafka.json.schema=${kafka.schema.json}

topic.whitelist=${kafka.topics}
org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
# Accepted values - latest, earliest and with offset lookback (latest offset - lookback)
bootstrap.with.offset=earliest
kafka.workunit.packer.type=SINGLE_LEVEL
mr.job.max.mappers=10
Expand Down

0 comments on commit d847d73

Please sign in to comment.