Skip to content

Commit

Permalink
Add possibility to manual db-cache invalidation
Browse files Browse the repository at this point in the history
Related to JanusGraph#3155

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed Aug 26, 2022
1 parent 0306efb commit d688d82
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 17 deletions.
12 changes: 12 additions & 0 deletions docs/operations/cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ entry retrieved from the storage backend.
If JanusGraph runs locally or against a storage backend that guarantees
immediate visibility of modifications, this value can be set to 0.

### Manual cache invalidation

It is possible to invalidate db-cache entries for necessary vertices manually.
JanusGraph provides `org.janusgraph.graphdb.database.cache.CacheInvalidationService`
which contains different methods for manual invalidation.
To force invalidate a set of vertices `forceInvalidateVertices` method can be used:
```groovy
graph.getDBCacheInvalidationService().forceInvalidateVertices(Arrays.asList(vertexId1, vertexId2));
```

For additional invalidation methods see JavaDoc of `CacheInvalidationService`.

## Storage Backend Caching

Each storage backend maintains its own data caching layer. These caches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BackendTransaction;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StandardStoreManager;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.configuration.ConfigElement;
import org.janusgraph.diskstorage.configuration.ConfigOption;
Expand Down Expand Up @@ -196,6 +197,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CUSTOM_ATTRIBUTE_CLASS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CUSTOM_SERIALIZER_CLASS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE_TIME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.FORCE_INDEX_USAGE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.HARD_MAX_LIMIT;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.IDS_STORE_NAME;
Expand All @@ -206,6 +208,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.LOG_SEND_DELAY;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.MANAGEMENT_LOG;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.MAX_COMMIT_TIME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REPLACE_INSTANCE_IF_EXISTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SCHEMA_CONSTRAINTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SCRIPT_EVAL_ENABLED;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SCRIPT_EVAL_ENGINE;
Expand All @@ -215,6 +218,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SYSTEM_LOG_TRANSACTIONS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.TRANSACTION_LOG;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.TX_CACHE_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.USER_LOG;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.USE_MULTIQUERY;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.VERBOSE_TX_RECOVERY;
Expand Down Expand Up @@ -8006,4 +8010,86 @@ public void testGremlinScriptEvaluationWithGremlinGroovyScriptEngine() {
assertFalse(graph.traversal().V().hasNext());
}
}

@Test
public void testVertexPropertiesForceInvalidationFromDBCache() {
if (StandardStoreManager.IN_MEMORY.getShorthands().contains(config.get(ConfigElement.getPath(STORAGE_BACKEND), String.class))) return;

WriteConfiguration config1 = config.copy();
config1.set(ConfigElement.getPath(UNIQUE_INSTANCE_ID), "testRelationsDBCacheForceRefresh1");
config1.set(ConfigElement.getPath(REPLACE_INSTANCE_IF_EXISTS), true);
config1.set(ConfigElement.getPath(DB_CACHE), true);
config1.set(ConfigElement.getPath(DB_CACHE_TIME), 0);

WriteConfiguration config2 = config.copy();
config2.set(ConfigElement.getPath(UNIQUE_INSTANCE_ID), "testRelationsDBCacheForceRefresh2");
config2.set(ConfigElement.getPath(REPLACE_INSTANCE_IF_EXISTS), true);
config2.set(ConfigElement.getPath(DB_CACHE), true);
config2.set(ConfigElement.getPath(DB_CACHE_TIME), 0);

JanusGraph graph1 = JanusGraphFactory.open(config1);
JanusGraph graph2 = JanusGraphFactory.open(config2);

JanusGraphVertex v1 = graph1.addVertex();
v1.property("name", "vertex1");
JanusGraphVertex v2 = graph1.addVertex();
v2.property("name", "vertex2");
JanusGraphVertex v3 = graph1.addVertex();
v3.property("name", "vertex3");

graph1.tx().commit();

// Cache vertices for graph1
JanusGraphTransaction tx1 = graph1.newTransaction();
tx1.traversal().V().valueMap().toList();
tx1.commit();

// Cache vertices for graph2
JanusGraphTransaction tx2 = graph2.newTransaction();
tx2.traversal().V().valueMap().toList();
tx2.commit();

tx1 = graph1.newTransaction();

// Update properties using graph1
tx1.traversal().V(v1.id()).property("name", "vertex1_updated").iterate();
tx1.traversal().V(v2.id()).property("name", "vertex2_updated").iterate();
tx1.traversal().V(v3.id()).property("name", "vertex3_updated").iterate();

tx1.commit();

tx2 = graph2.newTransaction();

// Check that cached properties in graph2 were not refreshed
assertEquals(Arrays.asList("vertex1"), tx2.traversal().V(v1.id()).valueMap().next().get("name"));
assertEquals(Arrays.asList("vertex2"), tx2.traversal().V(v2.id()).valueMap().next().get("name"));
assertEquals(Arrays.asList("vertex3"), tx2.traversal().V(v3.id()).valueMap().next().get("name"));

tx2.rollback();

// Invalidate cache for v1 only
graph2.getDBCacheInvalidationService().forceInvalidateVertex(v1.longId());

tx2 = graph2.newTransaction();

// Check that v1 cache was refreshed but cache for v2 and v3 was not refreshed
assertEquals(Arrays.asList("vertex1_updated"), tx2.traversal().V(v1.id()).valueMap().next().get("name"));
assertEquals(Arrays.asList("vertex2"), tx2.traversal().V(v2.id()).valueMap().next().get("name"));
assertEquals(Arrays.asList("vertex3"), tx2.traversal().V(v3.id()).valueMap().next().get("name"));

tx2.rollback();

// Invalidate cache for v2 and v3
graph2.getDBCacheInvalidationService().forceInvalidateVertices(Arrays.asList(v2.longId(), v3.longId()));

tx2 = graph2.newTransaction();

// Check that cache was refreshed for all three vertices
assertEquals(Arrays.asList("vertex1_updated"), tx2.traversal().V(v1.id()).valueMap().next().get("name"));
assertEquals(Arrays.asList("vertex2_updated"), tx2.traversal().V(v2.id()).valueMap().next().get("name"));
assertEquals(Arrays.asList("vertex3_updated"), tx2.traversal().V(v3.id()).valueMap().next().get("name"));

tx2.rollback();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.tinkerpop.gremlin.util.Gremlin;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.graphdb.configuration.JanusGraphConstants;
import org.janusgraph.graphdb.database.cache.CacheInvalidationService;

/**
* JanusGraph graph database implementation of the Blueprint's interface.
Expand Down Expand Up @@ -157,6 +158,8 @@ public interface JanusGraph extends Transaction {
@Override
void close() throws JanusGraphException;

CacheInvalidationService getDBCacheInvalidationService();

/**
* The version of this JanusGraph graph database
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,4 +708,12 @@ static ExecutorService buildExecutorService(Configuration configuration){
}
return executorService;
}

public KCVSCache getEdgeStoreCache(){
return edgeStore;
}

public KCVSCache getIndexStoreCache(){
return indexStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,31 @@ public void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries) {
if (Math.random()<1.0/INVALIDATE_KEY_FRACTION_PENALTY) penaltyCountdown.countDown();
}

@Override
public void forceClearExpiredCache() {
clearExpiredCache(false);
}

private synchronized void clearExpiredCache(boolean withNewPenaltyCountdown) {
//Do clean up work by invalidating all entries for expired keys
final Map<StaticBuffer,Long> expiredKeysCopy = new HashMap<>(expiredKeys.size());
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeys.entrySet()) {
if (isBeyondExpirationTime(expKey.getValue()))
expiredKeys.remove(expKey.getKey(), expKey.getValue());
else if (getAge(expKey.getValue())>= invalidationGracePeriodMS)
expiredKeysCopy.put(expKey.getKey(),expKey.getValue());
}
for (KeySliceQuery ksq : cache.asMap().keySet()) {
if (expiredKeysCopy.containsKey(ksq.getKey())) cache.invalidate(ksq);
}
if(withNewPenaltyCountdown){
penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);
}
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeysCopy.entrySet()) {
expiredKeys.remove(expKey.getKey(),expKey.getValue());
}
}

@Override
public void close() throws BackendException {
cleanupThread.stopThread();
Expand Down Expand Up @@ -203,21 +228,7 @@ public void run() {
if (stop) return;
else throw new RuntimeException("Cleanup thread got interrupted",e);
}
//Do clean up work by invalidating all entries for expired keys
final Map<StaticBuffer,Long> expiredKeysCopy = new HashMap<>(expiredKeys.size());
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeys.entrySet()) {
if (isBeyondExpirationTime(expKey.getValue()))
expiredKeys.remove(expKey.getKey(), expKey.getValue());
else if (getAge(expKey.getValue())>= invalidationGracePeriodMS)
expiredKeysCopy.put(expKey.getKey(),expKey.getValue());
}
for (KeySliceQuery ksq : cache.asMap().keySet()) {
if (expiredKeysCopy.containsKey(ksq.getKey())) cache.invalidate(ksq);
}
penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeysCopy.entrySet()) {
expiredKeys.remove(expKey.getKey(),expKey.getValue());
}
clearExpiredCache(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ protected void incActionBy(int by, CacheMetricsAction action, StoreTransaction t

public abstract void clearCache();

protected abstract void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries);
public abstract void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries);

public abstract void forceClearExpiredCache();

@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public void clearCache() {
}

@Override
protected void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries) {
public void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries) {
}

@Override
public void forceClearExpiredCache() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.cache.CacheInvalidationService;
import org.janusgraph.graphdb.database.cache.KCVSCacheInvalidationService;
import org.janusgraph.graphdb.database.cache.SchemaCache;
import org.janusgraph.graphdb.database.idassigner.VertexIDAssigner;
import org.janusgraph.graphdb.database.idhandling.IDHandler;
Expand Down Expand Up @@ -112,6 +114,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -160,6 +163,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
private final IDManager idManager;
private final VertexIDAssigner idAssigner;
private final TimestampProvider times;
private final CacheInvalidationService cacheInvalidationService;


//Serializers
protected final IndexSerializer indexSerializer;
Expand Down Expand Up @@ -200,6 +205,10 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) {
this.idAssigner = config.getIDAssigner(backend);
this.idManager = idAssigner.getIDManager();

this.cacheInvalidationService = new KCVSCacheInvalidationService(
Arrays.asList(backend.getEdgeStoreCache(), backend.getIndexStoreCache()),
idManager);

this.serializer = config.getSerializer();
StoreFeatures storeFeatures = backend.getStoreFeatures();
this.indexSerializer = new IndexSerializer(configuration.getConfiguration(), this.serializer,
Expand Down Expand Up @@ -298,6 +307,11 @@ public synchronized void close() throws JanusGraphException {
}
}

@Override
public CacheInvalidationService getDBCacheInvalidationService() {
return cacheInvalidationService;
}

private synchronized void closeInternal() {

if (!isOpen) return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2022 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb.database.cache;

import org.janusgraph.diskstorage.StaticBuffer;

public interface CacheInvalidationService {

/**
* Marks specific vertex as expired.
* It will make sure that any executed query associated with this vertex will be invalidated.
* @param vertexId vertex id to expire
*/
void markVertexAsExpired(Long vertexId);

/**
* Marks specific key as expired.
* It will make sure that any executed query associated with this key will be invalidated.
* @param key key to expire
*/
void markKeyAsExpired(StaticBuffer key);

/**
* Instead of waiting for a probabilistic invalidation it triggers all cached queries scan and invalidation.
* This will remove any cached expired data.
*/
void forceClearExpiredCache();

/**
* Marks a vertex as expired and triggers force clear of expired cache (i.e. {@link #forceClearExpiredCache()})
* @param vertexId vertex id to invalidate
*/
void forceInvalidateVertex(Long vertexId);

/**
* Marks vertices as expired and triggers force clear of expired cache (i.e. {@link #forceClearExpiredCache()})
* @param vertexIds vertex ids to invalidate
*/
void forceInvalidateVertices(Iterable<Long> vertexIds);

}
Loading

0 comments on commit d688d82

Please sign in to comment.