Skip to content

Commit

Permalink
Simplify CaffeineSharedCache
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriel Roldan <gabriel.roldan@gmail.com>
  • Loading branch information
groldan committed Jun 20, 2019
1 parent 9886e01 commit 9bbd34a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

public class CaffeineCacheBuilder implements SharedCacheBuilder {

private static final int DEFAULT_L1_CAPACITY = 10_000;

private long maxSizeBytes;

public @Override int getPriority() {
Expand All @@ -27,7 +25,7 @@ public class CaffeineCacheBuilder implements SharedCacheBuilder {
}

public @Override SharedCache build() {
return new CaffeineSharedCache(DEFAULT_L1_CAPACITY, maxSizeBytes);
return new CaffeineSharedCache(maxSizeBytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,14 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jdt.annotation.Nullable;
import org.locationtech.geogig.flatbuffers.FlatBuffersRevObjectSerializer;
import org.locationtech.geogig.model.RevObject;
import org.locationtech.geogig.model.RevObject.TYPE;
import org.locationtech.geogig.model.RevTree;
import org.locationtech.geogig.storage.RevObjectSerializer;
import org.locationtech.geogig.storage.cache.CacheIdentifier;
Expand All @@ -32,47 +25,16 @@
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Weigher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Futures;

/**
*
* @since 1.4
*/
public class CaffeineSharedCache implements SharedCache {

/**
* Executor service used to encode a {@link RevObject} to a {@code byte[]} and add it to the
* L2cache.
* <p>
* The executor alleviates the overhead of adding an object to the cache, as it needs to be
* serialized, but uses a bounded queue of up to # of cores, and a {@link CallerRunsPolicy} as
* rejected execution handler, so that under load, the calling thread will pay the price of
* encoding and caching instead of running an unbounded number of threads to store the objects
* in the cache.
*
* @see #insert(CacheKey, RevObject)
*/
static final ExecutorService WRITE_BACK_EXECUTOR;
static {
ThreadFactory tf = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GeoGig shared cache %d").build();

final int nThreads = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);

RejectedExecutionHandler sameThreadHandler = new ThreadPoolExecutor.CallerRunsPolicy();

int corePoolSize = 1;
WRITE_BACK_EXECUTOR = new ThreadPoolExecutor(corePoolSize, nThreads, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(nThreads), tf, sameThreadHandler);
}

private static final RevObjectSerializer ENCODER = new FlatBuffersRevObjectSerializer();

/**
* Size of the L1 cache {@link CacheKey} -> {@link RevTree}
*/
private static final int L1_CACHE_SIZE = 10_000;

private RevObjectSerializer encoder = ENCODER;

/**
Expand Down Expand Up @@ -110,108 +72,53 @@ public void setEncoder(RevObjectSerializer encoder) {
this.encoder = encoder;
}

/**
* The Level1 cache contains most recently used, already parsed, instances of {@link RevTree}
* objects as they tend to be slow to parse and queried very often.
* <p>
* When trees are evicted from the L1Cache due to size constraints, their serialized version
* will be added to the L2Cache if it's not already present.
*
* @see #put(CacheKey, RevObject)
* @see #getIfPresent(CacheKey)
*/
final Cache<CacheKey, RevTree> L1Cache;

/**
* The Level2 cache contains serialized versions of RevObjects, as they take less memory than
* Java objects and their size can be more or less accurately tracked.
*/
final Cache<CacheKey, byte[]> L2Cache;
final Cache<CacheKey, byte[]> byteCache;

private final SizeTracker sizeTracker;

private long maxCacheSizeBytes;

private int l1Capacity;

private long maxCacheSizeBytes2;

CaffeineSharedCache() {
this.L1Cache = Caffeine.newBuilder().maximumSize(0).build();
this.L2Cache = Caffeine.newBuilder().maximumSize(0).build();
this.byteCache = Caffeine.newBuilder().maximumSize(0).build();
this.sizeTracker = new SizeTracker();
}

public CaffeineSharedCache(final int L1Capacity, long maxCacheSizeBytes) {
l1Capacity = L1Capacity;
maxCacheSizeBytes2 = maxCacheSizeBytes;
public CaffeineSharedCache(final long maxCacheSizeBytes) {
this.maxCacheSizeBytes = maxCacheSizeBytes;
checkArgument(L1Capacity >= 0);
checkArgument(maxCacheSizeBytes >= 0, "Cache size can't be < 0, 0 meaning no cache at all");

int initialCapacityCount = 1_000_000;
int concurrencyLevel = 16;

Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
cacheBuilder = cacheBuilder.maximumWeight(maxCacheSizeBytes);
cacheBuilder.weigher(SizeTracker.WEIGHER);

cacheBuilder.initialCapacity(initialCapacityCount);
// cacheBuilder.concurrencyLevel(concurrencyLevel);
cacheBuilder.recordStats();
sizeTracker = new SizeTracker();
cacheBuilder.removalListener(sizeTracker);
Cache<CacheKey, byte[]> byteCache = cacheBuilder.build();

this.L2Cache = byteCache;

RemovalListener<CacheKey, RevObject> L1WriteBack = (key, value, cause) -> {
if (RemovalCause.SIZE == cause && value != null) {
putInternal(key, value);
}
};

this.L1Cache = Caffeine.newBuilder()//
.maximumSize(L1Capacity)//
.softValues()//
.removalListener(L1WriteBack)//
.build();

this.byteCache = cacheBuilder.build();
}

CaffeineSharedCache(final int L1Capacity, Cache<CacheKey, byte[]> byteCache,
SizeTracker sizeTracker) {
this.L2Cache = byteCache;
CaffeineSharedCache(Cache<CacheKey, byte[]> byteCache, SizeTracker sizeTracker) {
this.byteCache = byteCache;
this.sizeTracker = sizeTracker;

RemovalListener<CacheKey, RevObject> L1WriteBack = (key, value, cause) -> {
if (RemovalCause.SIZE == cause && value != null) {
putInternal(key, value);
}
};

this.L1Cache = Caffeine.newBuilder()//
.maximumSize(L1Capacity)//
.softValues()//
.removalListener(L1WriteBack)//
.build();
}

public @Override boolean contains(CacheKey id) {
return L1Cache.asMap().containsKey(id) || L2Cache.asMap().containsKey(id);
return byteCache.asMap().containsKey(id);
}

public @Override void invalidateAll() {
L1Cache.invalidateAll();
L2Cache.invalidateAll();

L1Cache.cleanUp();
L2Cache.cleanUp();
byteCache.invalidateAll();
byteCache.cleanUp();
}

public @Override void invalidateAll(CacheIdentifier prefix) {
invalidateAll(prefix, L1Cache.asMap());
invalidateAll(prefix, L2Cache.asMap());
invalidateAll(prefix, byteCache.asMap());
}

private void invalidateAll(CacheIdentifier prefix, ConcurrentMap<CacheKey, ?> map) {
Expand All @@ -227,7 +134,7 @@ private void invalidateAll(CacheIdentifier prefix, ConcurrentMap<CacheKey, ?> ma
}

public @Override void invalidate(CacheKey id) {
L2Cache.invalidate(id);
byteCache.invalidate(id);
}

/**
Expand All @@ -239,33 +146,20 @@ private void invalidateAll(CacheIdentifier prefix, ConcurrentMap<CacheKey, ?> ma
* a {@code RevTree}, will result in the tree being added back to the L1 cache.
*/
public @Override @Nullable RevObject getIfPresent(CacheKey key) {
RevObject obj = L1Cache.getIfPresent(key);
if (obj == null) {
// call cache.getIfPresent instead of map.get() or the cache stats don't record the
// hits/misses
byte[] val = L2Cache.getIfPresent(key);
if (val != null) {
obj = decode(key, val);
if (TYPE.TREE == obj.getType()) {// keep L1 hot on tree objects
L1Cache.asMap().putIfAbsent(key, (RevTree) obj);
}
}
// call cache.getIfPresent instead of map.get() or the cache stats don't record the
// hits/misses
byte[] val = byteCache.getIfPresent(key);
if (val != null) {
return decode(key, val);
}
return obj;
return null;
}

/**
* Adds the given object to the cache under the given key, if not already present.
* <p>
* If the object happens to be a {@link RevTree}, it will first be added to the
* {@link #L1Cache}. In either case, it's serialized version will be, possibly asynchronously,
* added to the {@link #L2Cache}.
*/
public @Override @Nullable Future<?> put(CacheKey key, RevObject obj) {
RevObject l1val = (TYPE.TREE == obj.getType() && l1Capacity > 0)
? L1Cache.asMap().putIfAbsent(key, (RevTree) obj)
: null;
if (maxCacheSizeBytes > 0L && l1val == null) {
if (maxCacheSizeBytes > 0L) {
// add it to L2 if not already present, even if it's a RevTree and has been added to
// the L1 cache, since removal notifications happen after the fact
return putInternal(key, obj);
Expand All @@ -275,21 +169,20 @@ private void invalidateAll(CacheIdentifier prefix, ConcurrentMap<CacheKey, ?> ma

@Nullable
Future<?> putInternal(CacheKey key, RevObject obj) {
if (L2Cache.asMap().containsKey(key)) {
return null;
}
return WRITE_BACK_EXECUTOR.submit(() -> insert(key, obj));
}

void insert(CacheKey key, RevObject obj) {
byte[] value = encode(obj);
if (null == L2Cache.asMap().putIfAbsent(key, value)) {
if (null == byteCache.asMap().putIfAbsent(key, value)) {
sizeTracker.inserted(key, value);
return CompletableFuture.completedFuture(null);
}
return null;
}

private byte[] encode(RevObject obj) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
if (encoder instanceof FlatBuffersRevObjectSerializer) {
return ((FlatBuffersRevObjectSerializer) encoder).encode(obj);
}
ByteArrayOutputStream out = new ByteArrayOutputStream(32 * 1024);
out.reset();
try {
encoder.write(obj, out);
} catch (IOException e) {
Expand All @@ -307,23 +200,23 @@ private RevObject decode(CacheKey key, byte[] val) {
}

public @Override String toString() {
long size = L2Cache.estimatedSize();
long size = byteCache.estimatedSize();
long bytes = sizeTracker.size.get();
long avg = size == 0 ? 0 : bytes / size;
return String.format("Size: %,d, bytes: %,d, avg: %,d bytes/entry, %s", size, bytes, avg,
L2Cache.stats());
byteCache.stats());
}

public @Override long sizeBytes() {
return sizeTracker.size.get();
}

public @Override long objectCount() {
return L2Cache.estimatedSize();
return byteCache.estimatedSize();
}

public @Override CacheStats getStats() {
final com.github.benmanes.caffeine.cache.stats.CacheStats stats = L2Cache.stats();
final com.github.benmanes.caffeine.cache.stats.CacheStats stats = byteCache.stats();
return new CacheStats() {
public @Override long hitCount() {
return stats.hitCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
*/
package org.locationtech.geogig.flatbuffers;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;

import org.eclipse.jdt.annotation.Nullable;
import org.locationtech.geogig.model.ObjectId;
Expand All @@ -33,7 +35,7 @@
public class FlatBuffersRevObjectSerializer implements RevObjectSerializer {

private static final ByteBufferFactory BYTE_BUFFER_FACTORY = capacity -> ByteBuffer
.allocateDirect(capacity).order(ByteOrder.LITTLE_ENDIAN);
.allocate(capacity).order(ByteOrder.LITTLE_ENDIAN);

static final ThreadLocal<FlatBufferBuilder> WRITE_BUFFERS = ThreadLocal
.withInitial(() -> new FlatBufferBuilder(32 * 1024, BYTE_BUFFER_FACTORY));
Expand Down Expand Up @@ -120,4 +122,29 @@ public FlatBuffersRevObjectSerializer(boolean lengthPrefixed) {
}
return flatBuffers.decode(id, data, offset + padding, length - padding);
}

public byte[] encode(@NonNull RevObject obj) {
if (obj instanceof FBRevObject) {
ByteBuffer dataBuffer = ((FBRevObject<?>) obj).getTable().getByteBuffer();
if (dataBuffer.hasArray()) {
byte[] array = dataBuffer.array();
if (array.length == dataBuffer.remaining()) {
return array;
} else if (dataBuffer.position() == 0) {
return Arrays.copyOf(array, dataBuffer.remaining());
}
}else {
byte[] array = new byte[dataBuffer.remaining()];
dataBuffer.duplicate().get(array);
return array;
}
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
write(obj, out);
} catch (IOException e) {
throw new RuntimeException(e);
}
return out.toByteArray();
}
}

0 comments on commit 9bbd34a

Please sign in to comment.