Skip to content

Commit

Permalink
Merge pull request awslabs#67 from ychunxue/ltr1x_latest
Browse files Browse the repository at this point in the history
KCL 1.x ShardEnd Shard Sync and Lease table ChildShard persistence
  • Loading branch information
ychunxue committed Jul 7, 2020
2 parents 0760688 + 0a5724c commit 53cc7fc
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ public ITask createTask(ShardConsumer consumer) {
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseCoordinator(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy());
consumer.getGetRecordsCache(), consumer.getShardSyncer(),
consumer.getShardSyncStrategy(), consumer.getChildShards());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.model.ChildShard;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -47,6 +52,7 @@ class KinesisDataFetcher {
private boolean isInitialized;
private String lastKnownSequenceNumber;
private InitialPositionInStreamExtended initialPositionInStream;
private List<ChildShard> childShards = Collections.emptyList();

/**
*
Expand Down Expand Up @@ -85,8 +91,11 @@ public DataFetcherResult getRecords(int maxRecords) {
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
@Override
public GetRecordsResult getResult() {
return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList())
.withNextShardIterator(null);
return new GetRecordsResult()
.withMillisBehindLatest(null)
.withRecords(Collections.emptyList())
.withNextShardIterator(null)
.withChildShards(Collections.emptyList());
}

@Override
Expand All @@ -113,12 +122,20 @@ public GetRecordsResult getResult() {

@Override
public GetRecordsResult accept() {
if (!isValidResult(result)) {
// Throwing SDK exception when the GetRecords result is not valid. This will allow PrefetchGetRecordsCache to retry the GetRecords call.
throw new SdkClientException("Shard " + shardId +": GetRecordsResult is not valid. NextShardIterator: " + result.getNextShardIterator()
+ ". ChildShards: " + result.getChildShards());
}
nextIterator = result.getNextShardIterator();
if (!CollectionUtils.isNullOrEmpty(result.getRecords())) {
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
}
if (nextIterator == null) {
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId);
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId + ". childShards: " + result.getChildShards());
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
childShards = result.getChildShards();
}
isShardEndReached = true;
}
return getResult();
Expand All @@ -130,6 +147,23 @@ public boolean isShardEnd() {
}
}

private boolean isValidResult(GetRecordsResult getRecordsResult) {
// GetRecords result should contain childShard information. There are two valid combination for the nextShardIterator and childShards
// If the GetRecords call does not reach the shard end, getRecords result should contain a non-null nextShardIterator and an empty list of childShards.
// If the GetRecords call reaches the shard end, getRecords result should contain a null nextShardIterator and a non-empty list of childShards.
// All other combinations are invalid and indicating an issue with GetRecords result from Kinesis service.
if (getRecordsResult.getNextShardIterator() == null && CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards()) ||
getRecordsResult.getNextShardIterator() != null && !CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards())) {
return false;
}
for (ChildShard childShard : getRecordsResult.getChildShards()) {
if (CollectionUtils.isNullOrEmpty(childShard.getParentShards())) {
return false;
}
}
return true;
}

/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
Expand All @@ -141,8 +175,7 @@ public void initialize(String initialCheckpoint, InitialPositionInStreamExtended
isInitialized = true;
}

public void initialize(ExtendedSequenceNumber initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream) {
public void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream);
isInitialized = true;
Expand Down Expand Up @@ -171,6 +204,7 @@ void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended in
if (nextIterator == null) {
LOG.info("Reached shard end: cannot advance iterator for shard " + shardId);
isShardEndReached = true;
// TODO: transition to ShuttingDown state on shardend instead to shutdown state for enqueueing this for cleanup
}
this.lastKnownSequenceNumber = sequenceNumber;
this.initialPositionInStream = initialPositionInStream;
Expand Down Expand Up @@ -248,6 +282,10 @@ protected boolean isShardEndReached() {
return isShardEndReached;
}

protected List<ChildShard> getChildShards() {
return childShards;
}

/** Note: This method has package level access for testing purposes.
* @return nextIterator
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.amazonaws.util.CollectionUtils;
Expand Down Expand Up @@ -779,6 +780,29 @@ static KinesisClientLease newKCLLease(Shard shard) {
return newLease;
}

/**
* Helper method to create a new KinesisClientLease POJO for a ChildShard.
* Note: Package level access only for testing purposes
*
* @param childShard
* @return
*/
static KinesisClientLease newKCLLeaseForChildShard(ChildShard childShard) throws InvalidStateException {
final KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(childShard.getShardId());
final List<String> parentShardIds = new ArrayList<>();
if (!CollectionUtils.isNullOrEmpty(childShard.getParentShards())) {
parentShardIds.addAll(childShard.getParentShards());
} else {
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.getShardId()
+ " because parent shards cannot be found.");
}
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return newLease;
}

/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public ProcessRecordsInput getNextResult() {
try {
result = getRecordsResultQueue.take().withCacheExitTime(Instant.now());
prefetchCounters.removed(result);
log.info("Shard " + shardId + ": Number of records remaining in queue is " + getRecordsResultQueue.size());
} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
}
Expand Down Expand Up @@ -177,7 +178,6 @@ public void run() {

MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
MetricsLevel.SUMMARY);

dataFetcher.restartIterator();
} catch (SdkClientException e) {
log.error("Exception thrown while fetching records from Kinesis", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public TaskResult call() {

try {
if (dataFetcher.isShardEndReached()) {
LOG.info("Reached end of shard " + shardInfo.getShardId());
return new TaskResult(null, true);
LOG.info("Reached end of shard " + shardInfo.getShardId() + ". Found childShards: " + dataFetcher.getChildShards());
return new TaskResult(null, true, dataFetcher.getChildShards());
}

final ProcessRecordsInput processRecordsInput = getRecordsResult();
Expand Down Expand Up @@ -353,7 +353,7 @@ private ProcessRecordsInput getRecordsResult() {
* recordProcessorCheckpointer).
*/
dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
.getSequenceNumber(), streamConfig.getInitialPositionInStream());

// Try a second time - if we fail this time, expose the failure.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;


import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -66,6 +69,9 @@ class ShardConsumer {
private Future<TaskResult> future;
private ShardSyncStrategy shardSyncStrategy;

@Getter
private List<ChildShard> childShards;

@Getter
private final GetRecordsCache getRecordsCache;

Expand Down Expand Up @@ -321,6 +327,10 @@ private TaskOutcome determineTaskOutcome() {
TaskResult result = future.get();
if (result.getException() == null) {
if (result.isShardEndReached()) {
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
childShards = result.getChildShards();
LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards);
}
return TaskOutcome.END_OF_SHARD;
}
return TaskOutcome.SUCCESSFUL;
Expand Down Expand Up @@ -420,6 +430,7 @@ private ITask getNextTask() {
void updateState(TaskOutcome taskOutcome) {
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE);
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason);
Expand Down
Loading

0 comments on commit 53cc7fc

Please sign in to comment.