Skip to content

Commit

Permalink
Replace Guava cache with Caffeine cache for vertex cache
Browse files Browse the repository at this point in the history
Fix the issue JanusGraph#3185
  • Loading branch information
clovertrail committed Aug 28, 2022
1 parent fdf0bee commit 35982ff
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 3 deletions.
6 changes: 5 additions & 1 deletion janusgraph-benchmark/pom.xml
Expand Up @@ -76,7 +76,11 @@
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>

<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>${easymock.version}</version>
</dependency>
</dependencies>

<profiles>
Expand Down
@@ -0,0 +1,171 @@
// Copyright 2022 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph;

import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.RelationType;
import org.janusgraph.core.schema.DefaultSchemaMaker;
import org.janusgraph.core.schema.PropertyKeyMaker;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.EdgeSerializer;
import org.janusgraph.graphdb.database.IndexSerializer;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.serialize.Serializer;
import org.janusgraph.graphdb.idmanagement.IDManager;
import org.janusgraph.graphdb.internal.ElementLifeCycle;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.query.index.IndexSelectionStrategy;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.transaction.TransactionConfiguration;
import org.janusgraph.graphdb.transaction.vertexcache.CaffeineVertexCache;
import org.janusgraph.graphdb.transaction.vertexcache.GuavaVertexCache;
import org.janusgraph.graphdb.transaction.vertexcache.VertexCache;
import org.janusgraph.graphdb.types.vertices.EdgeLabelVertex;
import org.janusgraph.util.datastructures.Retriever;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.AverageTime)
@Fork(1)
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class VertexCacheBenchmark extends EasyMockSupport {
protected StandardJanusGraphTx standardJanusGraphTx; // mocked tx

protected VertexConstructor vertexConstructor = new VertexConstructor();

public StandardJanusGraphTx createTxWithMockedInternals() {
StandardJanusGraph mockGraph = createMock(StandardJanusGraph.class);
TransactionConfiguration txConfig = createMock(TransactionConfiguration.class);
GraphDatabaseConfiguration gdbConfig = createMock(GraphDatabaseConfiguration.class);
TimestampProvider tsProvider = createMock(TimestampProvider.class);
Serializer mockSerializer = createMock(Serializer.class);
EdgeSerializer mockEdgeSerializer = createMock(EdgeSerializer.class);
IndexSerializer mockIndexSerializer = createMock(IndexSerializer.class);
RelationType relationType = createMock(RelationType.class);
IDManager idManager = createMock(IDManager.class);
PropertyKey propertyKey = createMock(PropertyKey.class);
DefaultSchemaMaker defaultSchemaMaker = createMock(DefaultSchemaMaker.class);
IndexSelectionStrategy indexSelectionStrategy = createMock(IndexSelectionStrategy.class);

EasyMock.expect(mockGraph.getConfiguration()).andReturn(gdbConfig);
EasyMock.expect(mockGraph.isOpen()).andReturn(true).anyTimes();
EasyMock.expect(mockGraph.getDataSerializer()).andReturn(mockSerializer);
EasyMock.expect(mockGraph.getEdgeSerializer()).andReturn(mockEdgeSerializer);
EasyMock.expect(mockGraph.getIndexSerializer()).andReturn(mockIndexSerializer);
EasyMock.expect(mockGraph.getIDManager()).andReturn(idManager);
EasyMock.expect(mockGraph.getIndexSelector()).andReturn(indexSelectionStrategy);

EasyMock.expect(gdbConfig.getTimestampProvider()).andReturn(tsProvider);

EasyMock.expect(txConfig.isSingleThreaded()).andReturn(true);
EasyMock.expect(txConfig.hasPreloadedData()).andReturn(false);
EasyMock.expect(txConfig.hasVerifyExternalVertexExistence()).andReturn(false);
EasyMock.expect(txConfig.hasVerifyInternalVertexExistence()).andReturn(false);
EasyMock.expect(txConfig.getVertexCacheSize()).andReturn(6);
EasyMock.expect(txConfig.isReadOnly()).andReturn(true);
EasyMock.expect(txConfig.getDirtyVertexSize()).andReturn(2);
EasyMock.expect(txConfig.getIndexCacheWeight()).andReturn(2L);
EasyMock.expect(txConfig.getGroupName()).andReturn(null);
EasyMock.expect(txConfig.getAutoSchemaMaker()).andReturn(defaultSchemaMaker);

EasyMock.expect(defaultSchemaMaker.makePropertyKey(EasyMock.isA(PropertyKeyMaker.class), EasyMock.notNull())).andReturn(propertyKey);

EasyMock.expect(relationType.isPropertyKey()).andReturn(false);

EasyMock.expect(propertyKey.isPropertyKey()).andReturn(true);

EasyMock.expect(txConfig.getDirtyVertexSize()).andReturn(1);
EasyMock.expect(txConfig.getIndexCacheWeight()).andReturn(1L);
EasyMock.expect(txConfig.getGroupName()).andReturn("test");
replayAll();

StandardJanusGraphTx partialMock = createMockBuilder(StandardJanusGraphTx.class)
.withConstructor(mockGraph, txConfig)
.addMockedMethod("getRelationType")
.createMock();

EasyMock.expect(partialMock.getRelationType("Foo")).andReturn(null);
EasyMock.expect(partialMock.getRelationType("Qux")).andReturn(propertyKey);
EasyMock.expect(partialMock.getRelationType("Baz")).andReturn(relationType);

EasyMock.replay(partialMock);
return partialMock;
}

private static final int SIZE = (1 << 10);

static final int MASK = SIZE - 1;

@Param({"guava", "caffeine"})
private String cacheType;

@State(Scope.Thread)
public static class ThreadState {
static final Random random = new Random();
int index = random.nextInt() + 1; // skip zero
}

private VertexCache cache;

@Setup
public void prepare() {
standardJanusGraphTx = createTxWithMockedInternals();
if (cacheType.equals("caffeine")) {
cache = new CaffeineVertexCache(SIZE, 32);
} else {
cache = new GuavaVertexCache(SIZE, 1, 32);
}
for (int i = 0; i < SIZE; i++) {
cache.add(new EdgeLabelVertex(standardJanusGraphTx, i+1, ElementLifeCycle.Loaded), i+1);
}
}

@TearDown
public void tearDown() {
cache.close();
}

@Benchmark
@Threads(8)
public Boolean run(ThreadState threadState) {
int index = threadState.index++ & MASK;
cache.get(index, vertexConstructor);
return true;
}

class VertexConstructor implements Retriever<Long, InternalVertex> {

@Override
public InternalVertex get(Long input) {
return new EdgeLabelVertex(standardJanusGraphTx, input, ElementLifeCycle.Loaded);
}
}
}
Expand Up @@ -111,8 +111,8 @@
import org.janusgraph.graphdb.transaction.subquerycache.EmptySubqueryCache;
import org.janusgraph.graphdb.transaction.subquerycache.GuavaSubqueryCache;
import org.janusgraph.graphdb.transaction.subquerycache.SubqueryCache;
import org.janusgraph.graphdb.transaction.vertexcache.CaffeineVertexCache;
import org.janusgraph.graphdb.transaction.vertexcache.EmptyVertexCache;
import org.janusgraph.graphdb.transaction.vertexcache.GuavaVertexCache;
import org.janusgraph.graphdb.transaction.vertexcache.VertexCache;
import org.janusgraph.graphdb.types.CompositeIndexType;
import org.janusgraph.graphdb.types.IndexType;
Expand Down Expand Up @@ -319,7 +319,7 @@ public void close() {
config.getVertexCacheSize(), effectiveVertexCacheSize, MIN_VERTEX_CACHE_SIZE);
}

vertexCache = new GuavaVertexCache(effectiveVertexCacheSize,concurrencyLevel,config.getDirtyVertexSize());
vertexCache = new CaffeineVertexCache(effectiveVertexCacheSize,config.getDirtyVertexSize());

indexCache = new GuavaSubqueryCache(concurrencyLevel, config.getIndexCacheWeight());

Expand Down
@@ -0,0 +1,133 @@
// Copyright 2022 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb.transaction.vertexcache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.base.Preconditions;
import org.jctools.maps.NonBlockingHashMapLong;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.vertices.AbstractVertex;
import org.janusgraph.util.datastructures.Retriever;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

public class CaffeineVertexCache implements VertexCache {
private static final Logger log =
LoggerFactory.getLogger(CaffeineVertexCache.class);

private final ConcurrentMap<Long, InternalVertex> volatileVertices;
private final Cache<Long, InternalVertex> cache;

private long createdTime; // the timestamp when the cache was created.

class CaffeineRemovalListener implements RemovalListener<Long, InternalVertex> {

@Override
public void onRemoval(@Nullable Long key,
@Nullable InternalVertex internalVertex,
@Nonnull RemovalCause removalCause) {
if (removalCause == RemovalCause.EXPLICIT) {
assert volatileVertices.isEmpty();
return;
}
assert (removalCause == RemovalCause.SIZE || removalCause == RemovalCause.REPLACED) : "Cause: " + removalCause;
InternalVertex v = internalVertex;
if (((AbstractVertex) v).isTxOpen() && (v.isModified() || v.isRemoved())) {
volatileVertices.putIfAbsent(key, v);
}
}
}

public CaffeineVertexCache(final long maxCacheSize, final int initialDirtySize) {
volatileVertices = new NonBlockingHashMapLong<>(initialDirtySize);
log.debug("Created dirty vertex map with initial size {}", initialDirtySize);

cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
.removalListener(new CaffeineRemovalListener())
.recordStats()
.executor(Runnable::run) // according to the https://github.com/ben-manes/caffeine/discussions/757
.build();
log.debug("Created vertex cache with max size {}", maxCacheSize);
createdTime = System.currentTimeMillis();
}

@Override
public boolean contains(long id) {
Long vertexId = id;
return cache.getIfPresent(vertexId) != null || volatileVertices.containsKey(vertexId);
}

@Override
public InternalVertex get(long id, Retriever<Long, InternalVertex> retriever) {
final Long vertexId = id;

InternalVertex vertex = cache.getIfPresent(vertexId);

if (vertex == null) {
InternalVertex newVertex = volatileVertices.get(vertexId);

if (newVertex == null) {
newVertex = retriever.get(vertexId);
}
assert newVertex != null;
final InternalVertex v = newVertex;
try {
vertex = cache.get(vertexId, (k) -> v);
} catch (Exception e) { throw new AssertionError("Should not happen: "+e.getMessage()); }
assert vertex!=null;
}

return vertex;
}

@Override
public void add(InternalVertex vertex, long id) {
Preconditions.checkNotNull(vertex);
Preconditions.checkArgument(id != 0);
Long vertexId = id;

cache.put(vertexId, vertex);
if (vertex.isNew() || vertex.hasAddedRelations())
volatileVertices.put(vertexId, vertex);
}

@Override
public List<InternalVertex> getAllNew() {
final List<InternalVertex> vertices = new ArrayList<>(10);
for (InternalVertex v : volatileVertices.values()) {
if (v.isNew()) vertices.add(v);
}
return vertices;
}

@Override
public synchronized void close() {
long end = System.currentTimeMillis();
CacheStats stats = cache.stats();
log.debug("Caffeine cache (lifespan: {}ms) stats: {}", end - createdTime, stats.toString());
volatileVertices.clear();
cache.invalidateAll();
cache.cleanUp();
}
}

0 comments on commit 35982ff

Please sign in to comment.