Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Separate cache for shard iterators to invalidate expired
Browse files Browse the repository at this point in the history
  • Loading branch information
busjaeger committed Oct 18, 2018
1 parent 11549eb commit f75f755
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static java.util.stream.Collectors.toList;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.model.ExpiredIteratorException;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
Expand All @@ -20,6 +21,9 @@
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamRecord;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.salesforce.dynamodbv2.mt.mappers.DelegatingAmazonDynamoDbStreams;
import java.math.BigInteger;
import java.util.ArrayList;
Expand Down Expand Up @@ -80,10 +84,12 @@ public static class Builder {
private static final int DEFAULT_MAX_RECORD_BYTES_CACHED = 100 * 1024 * 1024;
private static final int DEFAULT_MAX_GET_RECORDS_RETRIES = 10;
private static final long DEFAULT_GET_RECORDS_LIMIT_EXCEEDED_BACKOFF_IN_MILLIS = 1000L;
private static final int DEFAULT_MAX_ITERATOR_CACHE_SIZE = 100;

private final AmazonDynamoDBStreams amazonDynamoDbStreams;
private Sleeper sleeper;
private long maxRecordsByteSize = DEFAULT_MAX_RECORD_BYTES_CACHED;
private int maxIteratorCacheSize = DEFAULT_MAX_ITERATOR_CACHE_SIZE;
private int maxGetRecordsRetries = DEFAULT_MAX_GET_RECORDS_RETRIES;
private long getRecordsLimitExceededBackoffInMillis =
DEFAULT_GET_RECORDS_LIMIT_EXCEEDED_BACKOFF_IN_MILLIS;
Expand Down Expand Up @@ -143,6 +149,17 @@ public Builder withGetRecordsLimitExceededBackoffInMillis(long getRecordsLimitEx
return this;
}

/**
* Maximum number of shard iterators to cache.
*
* @param maxIteratorCacheSize Maximum number of iterators to cache.
* @return this builder.
*/
public Builder withMaxIteratorCacheSize(int maxIteratorCacheSize) {
this.maxIteratorCacheSize = maxIteratorCacheSize;
return this;
}

/**
* Build instance using the configured properties.
*
Expand All @@ -164,7 +181,8 @@ public CachingAmazonDynamoDbStreams build() {
sleeper,
maxRecordsByteSize,
maxGetRecordsRetries,
getRecordsLimitExceededBackoffInMillis);
getRecordsLimitExceededBackoffInMillis,
maxIteratorCacheSize);
}
}

Expand Down Expand Up @@ -243,13 +261,12 @@ IteratorPosition positionAfterResult(GetRecordsResult result) {
* Returns a shard iterator that starts after the last record in the given result.
*
* @param result Result to position the iterator after. Record list must not be empty.
* @param dynamoDbIterator Optional DynamoDB-level iterator after the last record.
* @return CachingShardIterator that is positioned after the given result.
*/
CachingShardIterator iteratorAfterResult(GetRecordsResult result, @Nullable String dynamoDbIterator) {
CachingShardIterator iteratorAfterResult(GetRecordsResult result) {
assert !result.getRecords().isEmpty();
return new CachingShardIterator(streamArn, shardId, AFTER_SEQUENCE_NUMBER,
getLast(result.getRecords()).getDynamodb().getSequenceNumber(), dynamoDbIterator);
getLast(result.getRecords()).getDynamodb().getSequenceNumber(), null);
}

/**
Expand Down Expand Up @@ -400,7 +417,7 @@ private static String getSequenceNumber(String pointer, ShardIteratorType type)
@Nullable
private final String sequenceNumber;
@Nullable
private String dynamoDbIterator;
private final String dynamoDbIterator;

// derived cached state
private final BigInteger parsedSequenceNumber;
Expand All @@ -414,51 +431,46 @@ private CachingShardIterator(
this.streamArn = checkNotNull(streamArn);
this.shardId = checkNotNull(shardId);
this.type = type;
this.dynamoDbIterator = dynamoDbIterator;

switch (type) {
case TRIM_HORIZON:
case LATEST:
checkArgument(sequenceNumber == null);
checkArgument(dynamoDbIterator != null);
this.sequenceNumber = null;
this.parsedSequenceNumber = null;
this.dynamoDbIterator = dynamoDbIterator;
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
checkArgument(sequenceNumber != null);
checkArgument(dynamoDbIterator == null);
this.sequenceNumber = sequenceNumber;
this.parsedSequenceNumber = parseSequenceNumber(sequenceNumber);
this.dynamoDbIterator = null;
break;
default:
throw new RuntimeException("Missing case statement for ShardIteratorType");
}
}

/**
* Returns a new iterator with the given dynamoDbIterator.
* Returns the DynamoDB shard iterator, may be empty.
*
* @param dynamoDbIterator DynamoDb iterator.
* @return New iterator.
* @return DynamoDB shard iterator
*/
CachingShardIterator withDynamoDbIterator(String dynamoDbIterator) {
this.dynamoDbIterator = dynamoDbIterator;
return this;
Optional<String> getDynamoDbIterator() {
return Optional.ofNullable(dynamoDbIterator);
}

/**
* Returns underlying streams iterator, loading it if present.
* Returns a new iterator with the given dynamoDbIterator.
*
* @param streams Streams instance to use for loading iterator if needed.
* @return DynamoDB iterator.
* @param dynamoDbIterator DynamoDb iterator.
* @return A new iterator.
*/
String getDynamoDbIterator(AmazonDynamoDBStreams streams) {
if (dynamoDbIterator == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading DynamoDB iterator: iterator={}", this);
}
dynamoDbIterator = streams.getShardIterator(toRequest()).getShardIterator();
}
return dynamoDbIterator;
CachingShardIterator withDynamoDbIterator(String dynamoDbIterator) {
return new CachingShardIterator(streamArn, shardId, type, sequenceNumber, dynamoDbIterator);
}

/**
Expand Down Expand Up @@ -558,6 +570,28 @@ String toExternalString() {
public String toString() {
return toExternalString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CachingShardIterator that = (CachingShardIterator) o;
return Objects.equals(streamArn, that.streamArn)
&& Objects.equals(shardId, that.shardId)
&& type == that.type
&& Objects.equals(sequenceNumber, that.sequenceNumber)
&& Objects.equals(dynamoDbIterator, that.dynamoDbIterator)
&& Objects.equals(parsedSequenceNumber, that.parsedSequenceNumber);
}

@Override
public int hashCode() {
return Objects.hash(streamArn, shardId, type, sequenceNumber, dynamoDbIterator, parsedSequenceNumber);
}
}

// logger instance
Expand Down Expand Up @@ -624,11 +658,15 @@ private static String toShortString(GetRecordsResult result) {
// size of cache >= 0
private long recordsCacheByteSize;

// iterator cache
private final LoadingCache<CachingShardIterator, String> iteratorCache;

private CachingAmazonDynamoDbStreams(AmazonDynamoDBStreams amazonDynamoDbStreams,
Sleeper sleeper,
long maxRecordsByteSize,
int maxGetRecordsRetries,
long getRecordsLimitExceededBackoffInMillis) {
long getRecordsLimitExceededBackoffInMillis,
int maxIteratorCacheSize) {
super(amazonDynamoDbStreams);
this.sleeper = sleeper;
this.maxRecordsByteSize = maxRecordsByteSize;
Expand All @@ -639,6 +677,18 @@ private CachingAmazonDynamoDbStreams(AmazonDynamoDBStreams amazonDynamoDbStreams
this.recordsCacheIndex = new TreeMap<>();
this.recordsCacheByteSize = 0L;
this.recordsCacheLock = new ReentrantReadWriteLock();

this.iteratorCache = CacheBuilder
.newBuilder()
.maximumSize(maxIteratorCacheSize)
.build(CacheLoader.from(this::loadShardIterator));
}

private String loadShardIterator(CachingShardIterator iterator) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cache miss for iterator {}", iterator);
}
return super.getShardIterator(iterator.toRequest()).getShardIterator();
}

@Override
Expand All @@ -660,7 +710,6 @@ public GetShardIteratorResult getShardIterator(GetShardIteratorRequest request)
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
// TODO add 15 min timeout to lazy iterators?
dynamoDbIterator = null;
break;
default:
Expand Down Expand Up @@ -716,13 +765,14 @@ private GetRecordsResult getRecords(CachingShardIterator iterator) {
}

// If we have a cache miss, get DynamoDB iterator (load if needed)
final String shardIterator = iterator.getDynamoDbIterator(dynamoDbStreams);
final String dynamoDbIterator = iterator.getDynamoDbIterator()
.orElseGet(() -> iteratorCache.getUnchecked(iterator));

// next load records from stream
final GetRecordsResult loadedRecordsResult;
try {
loadedRecordsResult = dynamoDbStreams.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator));
new GetRecordsRequest().withShardIterator(dynamoDbIterator));
} catch (LimitExceededException e) {
long backoff = (getRecordsRetries + 1) * getRecordsLimitExceededBackoffInMillis;
if (LOG.isWarnEnabled()) {
Expand All @@ -732,36 +782,60 @@ private GetRecordsResult getRecords(CachingShardIterator iterator) {
sleeper.sleep(backoff);
getRecordsRetries++;
continue;
} // TODO could catch ExpiredIteratorException and automatically renew shard iterators in cached results
} catch (ExpiredIteratorException e) {
// if we loaded the iterator from our cache, reload it
if (!iterator.getDynamoDbIterator().isPresent()) {
if (LOG.isInfoEnabled()) {
LOG.info("Cached iterator expired: iterator={}, expired={}.", iterator, dynamoDbIterator);
}
iteratorCache.invalidate(iterator);
continue;
}
// otherwise, make the client obtain a new iterator as usual
throw e;
}

if (LOG.isDebugEnabled()) {
LOG.debug("getRecords loaded records: iterator={}, result={}", iterator,
toShortString(loadedRecordsResult));
LOG.debug("getRecords loaded records: result={}, iterator={}", toShortString(loadedRecordsResult),
iterator);
}

List<Record> loadedRecords = loadedRecordsResult.getRecords();
String loadedNextIterator = loadedRecordsResult.getNextShardIterator();

// if we didn't load anything, return without adding cache segment (preserves non-empty range invariant)
// TODO could cache empty results for (short) time period to avoid having every client hit the stream.
if (loadedRecordsResult.getRecords().isEmpty()) {
if (loadedRecordsResult.getNextShardIterator() == null) {
if (loadedRecords.isEmpty()) {
// end of shard, just return
if (loadedNextIterator == null) {
return loadedRecordsResult;
}
// replace with loaded iterator, so it is used to proceed through stream on next call
// otherwise compute next iterator (update cache for lazy iterators)
CachingShardIterator nextIterator = iterator.getDynamoDbIterator()
.map(iterator::withDynamoDbIterator)
.orElseGet(() -> {
iteratorCache.put(iterator, loadedNextIterator);
return iterator;
});
return new GetRecordsResult()
.withRecords(loadedRecordsResult.getRecords())
.withNextShardIterator(
iterator.withDynamoDbIterator(loadedRecordsResult.getNextShardIterator()).toExternalString());
.withRecords(loadedRecords)
.withNextShardIterator(nextIterator.toExternalString());
}

// otherwise (if we found records), try to update the cache
// update iterator cache
if (loadedNextIterator != null) {
iteratorCache.put(iterator.nextShardIterator(loadedRecords), loadedNextIterator);
}

// update record cache
// First resolve iterator position (either to sequence number it specifies or first record sequence number)
IteratorPosition loadedPosition = iterator.resolvePosition(loadedRecords);
Optional<GetRecordsResult> cachedResult;
GetRecordsResult result;
final Lock writeLock = recordsCacheLock.writeLock();
writeLock.lock();
try {
// Resolve iterator position (either to sequence number it specifies or first record sequence number)
IteratorPosition loadedPosition = iterator.resolvePosition(loadedRecordsResult.getRecords());

// Add retrieved records to cache under that position
// Add retrieved records to cache under resolved position
cachedResult = addToCache(loadedPosition, loadedRecordsResult);

// now lookup result: may not be exactly what we loaded if we merged result with other segments.
Expand Down Expand Up @@ -867,10 +941,10 @@ private Optional<GetRecordsResult> addToCache(IteratorPosition loadedPosition, G
writeLock.lock();
try {
IteratorPosition cachePosition = loadedPosition;
GetRecordsResult cacheResult = new GetRecordsResult().withRecords(loadedResult.getRecords());
Optional.ofNullable(loadedResult.getNextShardIterator())
.map(nextIterator -> loadedPosition.iteratorAfterResult(loadedResult, nextIterator).toExternalString())
.ifPresent(cacheResult::setNextShardIterator);
GetRecordsResult cacheResult = new GetRecordsResult()
.withRecords(loadedResult.getRecords())
.withNextShardIterator(loadedResult.getNextShardIterator() == null ? null
: loadedPosition.iteratorAfterResult(loadedResult).toExternalString());

boolean predecessorAdjacent = false;
final Entry<IteratorPosition, GetRecordsResult> predecessor = getFloorCacheEntry(loadedPosition);
Expand Down Expand Up @@ -913,7 +987,7 @@ private Optional<GetRecordsResult> addToCache(IteratorPosition loadedPosition, G
} else {
// if some of the retrieved records are not contained in the next segment,
cacheResult.setNextShardIterator(
cachePosition.iteratorAfterResult(cacheResult, null).toExternalString());
cachePosition.iteratorAfterResult(cacheResult).toExternalString());
successorAdjacent = true;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,16 +614,15 @@ void testCompleteOverlap() {
void testCloseGap() {
AmazonDynamoDBStreams streams = mock(AmazonDynamoDBStreams.class);

GetShardIteratorRequest thRequest = newTrimHorizonRequest();
String thIterator = mockGetShardIterator(streams, thRequest);
mockGetRecords(streams, thIterator, 0, 5);
final GetShardIteratorRequest thRequest = newTrimHorizonRequest();
final String thIterator = mockGetShardIterator(streams, thRequest);
final String afterIterator = mockGetRecords(streams, thIterator, 0, 5);

GetShardIteratorRequest atRequest = newAtSequenceNumberRequest(5);
String atIterator = mockGetShardIterator(streams, atRequest);
final GetShardIteratorRequest atRequest = newAtSequenceNumberRequest(5);
final String atIterator = mockGetShardIterator(streams, atRequest);
mockGetRecords(streams, atIterator, 5, 10);

GetShardIteratorRequest afterRequest = newAfterSequenceNumberRequest(4);
String afterIterator = mockGetShardIterator(streams, afterRequest);
final GetShardIteratorRequest afterRequest = newAfterSequenceNumberRequest(4);
mockGetRecords(streams, afterIterator, 5, 10);

CachingAmazonDynamoDbStreams cachingStreams = new CachingAmazonDynamoDbStreams.Builder(streams).build();
Expand All @@ -633,7 +632,7 @@ void testCloseGap() {
assertGetRecords(cachingStreams, afterRequest, null, 5, 10);
assertGetRecords(cachingStreams, newAtSequenceNumberRequest(0), null, 0, 10);

assertCacheMisses(streams, 3, 3);
assertCacheMisses(streams, 2, 3);
}

/**
Expand Down Expand Up @@ -774,6 +773,7 @@ void testCacheEviction() {

CachingAmazonDynamoDbStreams cachingStreams = new CachingAmazonDynamoDbStreams.Builder(streams)
.withMaxRecordsByteSize(4L)
.withMaxIteratorCacheSize(1)
.build();

assertGetRecords(cachingStreams, firstRequest, null, 0, 4);
Expand Down Expand Up @@ -810,6 +810,7 @@ void testDisableCache() {

CachingAmazonDynamoDbStreams cachingStreams = new CachingAmazonDynamoDbStreams.Builder(streams)
.withMaxRecordsByteSize(0L)
.withMaxIteratorCacheSize(0)
.build();

assertGetRecords(cachingStreams, request, null, 0, 5);
Expand Down Expand Up @@ -955,7 +956,9 @@ void testConcurrentRetrieves() {
// pretend more records have come along by the time the second iterator is used to get records
mockGetRecords(streams, iterator2, 0, 5);

final CachingAmazonDynamoDbStreams cachingStreams = new CachingAmazonDynamoDbStreams.Builder(streams).build();
final CachingAmazonDynamoDbStreams cachingStreams = new CachingAmazonDynamoDbStreams.Builder(streams)
.withMaxIteratorCacheSize(0)
.build();

doAnswer(invocation -> {
assertGetRecords(cachingStreams, request, null, 0, 5);
Expand Down

0 comments on commit f75f755

Please sign in to comment.