Skip to content

Commit

Permalink
Use ParameterTypeMap as second part of cache key and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SaschaPeukert committed Jul 25, 2018
1 parent 9ab0a5d commit bffca84
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 178 deletions.
Expand Up @@ -28,11 +28,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.cypher.internal.ParameterTypeMap;
import org.neo4j.cypher.internal.compatibility.CypherCacheHitMonitor;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.rule.DatabaseRule;
Expand Down Expand Up @@ -186,27 +188,27 @@ private static int randomInt( int max )
return ThreadLocalRandom.current().nextInt( max );
}

private static class TestMonitor implements CypherCacheHitMonitor<String>
private static class TestMonitor implements CypherCacheHitMonitor<Pair<String,ParameterTypeMap>>
{
private final AtomicInteger hits = new AtomicInteger();
private final AtomicInteger misses = new AtomicInteger();
private final AtomicInteger discards = new AtomicInteger();
private final AtomicLong waitTime = new AtomicLong();

@Override
public void cacheHit( String key )
public void cacheHit( Pair<String,ParameterTypeMap> key )
{
hits.incrementAndGet();
}

@Override
public void cacheMiss( String key )
public void cacheMiss( Pair<String,ParameterTypeMap> key )
{
misses.incrementAndGet();
}

@Override
public void cacheDiscard( String key, String ignored, int secondsSinceReplan )
public void cacheDiscard( Pair<String,ParameterTypeMap> key, String ignored, int secondsSinceReplan )
{
discards.incrementAndGet();
waitTime.addAndGet( secondsSinceReplan );
Expand Down
Expand Up @@ -19,14 +19,14 @@
*/
package org.neo4j.cypher

import org.neo4j.cypher.internal.StringCacheMonitor
import org.neo4j.cypher.internal.{ParameterTypeMap, StringCacheMonitor}
import org.neo4j.graphdb.Label
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.util.test_helpers.CypherFunSuite
import org.scalatest.prop.TableDrivenPropertyChecks

import scala.collection.JavaConversions._
import scala.collection.mutable

class QueryCachingTest extends CypherFunSuite with GraphDatabaseTestSupport with TableDrivenPropertyChecks {
Expand All @@ -41,6 +41,7 @@ class QueryCachingTest extends CypherFunSuite with GraphDatabaseTestSupport with
val query = "MATCH (n:Person) RETURN n"
val profileQuery = s"PROFILE $query"
val explainQuery = s"EXPLAIN $query"
val empty_parameters = "Map()"

val modeCombinations = Table(
("firstQuery", "secondQuery"),
Expand Down Expand Up @@ -73,14 +74,80 @@ class QueryCachingTest extends CypherFunSuite with GraphDatabaseTestSupport with
val actual = cacheListener.trace.map(str => str.replaceAll("\\s+", " "))
val expected = List(
s"cacheFlushDetected",
s"cacheMiss: CYPHER 3.5 $query",
s"cacheHit: CYPHER 3.5 $query",
s"cacheHit: CYPHER 3.5 $query")
s"cacheMiss: (CYPHER 3.5 $query, $empty_parameters)",
s"cacheHit: (CYPHER 3.5 $query, $empty_parameters)",
s"cacheHit: (CYPHER 3.5 $query, $empty_parameters)")

actual should equal(expected)
}
}

test("repeating query with same parameters should hit the cache") {

val cacheListener = new LoggingStringCacheListener
kernelMonitors.addMonitorListener(cacheListener)

val query = "RETURN $n"
val params1: Map[String, AnyRef] = Map("n" -> Long.box(42))

graph.execute(query, params1).resultAsString()
graph.execute(query, params1).resultAsString()

val actual = cacheListener.trace.map(str => str.replaceAll("\\s+", " "))
val expected = List(
s"cacheFlushDetected",
s"cacheMiss: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))",
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))",
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))")

actual should equal(expected)
}

test("repeating query with same parameter types but different values should hit the cache") {

val cacheListener = new LoggingStringCacheListener
kernelMonitors.addMonitorListener(cacheListener)

val query = "RETURN $n"
val params1: Map[String, AnyRef] = Map("n" -> Long.box(12))
val params2: Map[String, AnyRef] = Map("n" -> Long.box(42))
graph.execute(query, params1).resultAsString()
graph.execute(query, params2).resultAsString()

val actual = cacheListener.trace.map(str => str.replaceAll("\\s+", " "))
val expected = List(
s"cacheFlushDetected",
s"cacheMiss: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))",
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))",
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))")

actual should equal(expected)
}

test("repeating query with different parameters types should not hit the cache") {

val cacheListener = new LoggingStringCacheListener
kernelMonitors.addMonitorListener(cacheListener)

val query = "RETURN $n"
val params1: Map[String, AnyRef] = Map("n" -> Long.box(42))
val params2: Map[String, AnyRef] = Map("n" -> "nope")

graph.execute(query, params1).resultAsString()
graph.execute(query, params2).resultAsString()

val actual = cacheListener.trace.map(str => str.replaceAll("\\s+", " "))
val expected = List(
s"cacheFlushDetected",
s"cacheMiss: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))",
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.LongValue))",
s"cacheMiss: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.StringWrappingStringValue))",
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.StringWrappingStringValue))")

actual should equal(expected)

}

private class LoggingStringCacheListener extends StringCacheMonitor {
private var log: mutable.Builder[String, List[String]] = List.newBuilder

Expand All @@ -94,15 +161,15 @@ class QueryCachingTest extends CypherFunSuite with GraphDatabaseTestSupport with
log += s"cacheFlushDetected"
}

override def cacheHit(key: Pair[String, MapValue]): Unit = {
override def cacheHit(key: Pair[String, ParameterTypeMap]): Unit = {
log += s"cacheHit: $key"
}

override def cacheMiss(key: Pair[String, MapValue]): Unit = {
override def cacheMiss(key: Pair[String, ParameterTypeMap]): Unit = {
log += s"cacheMiss: $key"
}

override def cacheDiscard(key: Pair[String, MapValue], ignored: String, secondsSinceReplan: Int): Unit = {
override def cacheDiscard(key: Pair[String, ParameterTypeMap], ignored: String, secondsSinceReplan: Int): Unit = {
log += s"cacheDiscard: $key"
}
}
Expand Down
Expand Up @@ -23,15 +23,15 @@ import java.time.{Clock, Instant, ZoneOffset}

import org.neo4j.cypher
import org.neo4j.cypher._
import org.neo4j.cypher.internal.compiler.v3_5.{CypherPlannerConfiguration, StatsDivergenceCalculator}
import org.neo4j.cypher.internal.compatibility.{CommunityRuntimeContextCreator, CypherCurrentCompiler, RuntimeContext}
import org.neo4j.cypher.internal.compiler.v3_5.{CypherPlannerConfiguration, StatsDivergenceCalculator}
import org.neo4j.cypher.internal.runtime.interpreted.CSVResources
import org.neo4j.cypher.internal.{CacheTracer, CommunityRuntimeFactory, PreParsedQuery}
import org.neo4j.cypher.internal.{CacheTracer, CommunityRuntimeFactory, ParameterTypeMap, PreParsedQuery}
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.helpers.collection.Pair
import org.neo4j.logging.AssertableLogProvider.inLog
import org.neo4j.logging.{AssertableLogProvider, Log, NullLog}
import org.opencypher.v9_0.ast.Statement
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer
import org.opencypher.v9_0.util.DummyPosition
import org.opencypher.v9_0.util.test_helpers.CypherFunSuite
Expand Down Expand Up @@ -75,20 +75,20 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData
override def toString = s"hits = $hits, misses = $misses, flushes = $flushes, evicted = $evicted"
}

class CacheCounter(var counts: CacheCounts = CacheCounts()) extends CacheTracer[Statement] {
override def queryCacheHit(key: Statement, metaData: String) {
class CacheCounter(var counts: CacheCounts = CacheCounts()) extends CacheTracer[Pair[AnyRef,ParameterTypeMap]] {
override def queryCacheHit(key: Pair[AnyRef,ParameterTypeMap], metaData: String) {
counts = counts.copy(hits = counts.hits + 1)
}

override def queryCacheMiss(key: Statement, metaData: String) {
override def queryCacheMiss(key: Pair[AnyRef,ParameterTypeMap], metaData: String) {
counts = counts.copy(misses = counts.misses + 1)
}

override def queryCacheFlush(sizeBeforeFlush: Long) {
counts = counts.copy(flushes = counts.flushes + 1)
}

override def queryCacheStale(key: Statement, secondsSincePlan: Int, metaData: String): Unit = {
override def queryCacheStale(key: Pair[AnyRef,ParameterTypeMap], secondsSincePlan: Int, metaData: String): Unit = {
counts = counts.copy(evicted = counts.evicted + 1)
}
}
Expand All @@ -105,10 +105,10 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData
compiler.kernelMonitors.addMonitorListener(counter)
}

private def runQuery(query: String, debugOptions: Set[String] = Set.empty): Unit = {
private def runQuery(query: String, debugOptions: Set[String] = Set.empty, params: scala.Predef.Map[String, Any] = Map.empty): Unit = {
graph.withTx { tx =>
val noTracing = CompilationPhaseTracer.NO_TRACING
val context = graph.transactionalContext(query = query -> Map.empty)
val context = graph.transactionalContext(query = query -> params)
compiler.compile(PreParsedQuery(query, DummyPosition(0), query,
isPeriodicCommit = false,
CypherVersion.default,
Expand Down Expand Up @@ -260,4 +260,13 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData

counter.counts.hits should equal(0)
}

test("should not find query in cache with different parameter types") {
val map1: scala.Predef.Map[String, Any] = scala.Predef.Map("number" -> 42)
val map2: scala.Predef.Map[String, Any] = scala.Predef.Map("number" -> "nope")
runQuery("return $number", params = map1)
runQuery("return $number", params = map2)

counter.counts should equal(CacheCounts(hits = 0, misses = 2, flushes = 1))
}
}
Expand Up @@ -20,14 +20,14 @@
package org.neo4j.cypher.internal.javacompat;

import org.neo4j.cypher.internal.CacheTracer;
import org.neo4j.cypher.internal.ParameterTypeMap;
import org.neo4j.cypher.internal.StringCacheMonitor;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.values.virtual.MapValue;

/**
* Adapter for passing CacheTraces into the Monitoring infrastructure.
*/
public class MonitoringCacheTracer implements CacheTracer<Pair<String,MapValue>>
public class MonitoringCacheTracer implements CacheTracer<Pair<String,ParameterTypeMap>>
{
private final StringCacheMonitor monitor;

Expand All @@ -37,19 +37,19 @@ public MonitoringCacheTracer( StringCacheMonitor monitor )
}

@Override
public void queryCacheHit( Pair<String,MapValue> queryKey, String metaData )
public void queryCacheHit( Pair<String,ParameterTypeMap> queryKey, String metaData )
{
monitor.cacheHit( queryKey );
}

@Override
public void queryCacheMiss( Pair<String,MapValue> queryKey, String metaData )
public void queryCacheMiss( Pair<String,ParameterTypeMap> queryKey, String metaData )
{
monitor.cacheMiss( queryKey );
}

@Override
public void queryCacheStale( Pair<String,MapValue> queryKey, int secondsSincePlan, String metaData )
public void queryCacheStale( Pair<String,ParameterTypeMap> queryKey, int secondsSincePlan, String metaData )
{
monitor.cacheDiscard( queryKey, metaData, secondsSincePlan );
}
Expand Down
Expand Up @@ -21,15 +21,14 @@ package org.neo4j.cypher

import java.util.concurrent.atomic.AtomicLong

import org.neo4j.cypher.internal.StringCacheMonitor
import org.neo4j.cypher.internal.{ParameterTypeMap, StringCacheMonitor}
import org.neo4j.helpers.collection.Pair
import org.neo4j.values.virtual.MapValue

class PlanCacheMetricsMonitor extends StringCacheMonitor {
private val counter = new AtomicLong()
private val waitTime = new AtomicLong()

override def cacheDiscard(ignored1: Pair[String, MapValue], ignored2: String, secondsSinceReplan: Int): Unit = {
override def cacheDiscard(ignored1: Pair[String, ParameterTypeMap], ignored2: String, secondsSinceReplan: Int): Unit = {
counter.incrementAndGet()
waitTime.addAndGet(secondsSinceReplan)
}
Expand Down
Expand Up @@ -35,7 +35,7 @@ import org.neo4j.kernel.monitoring.Monitors
import org.neo4j.logging.LogProvider
import org.neo4j.values.virtual.MapValue

trait StringCacheMonitor extends CypherCacheMonitor[Pair[String, MapValue]]
trait StringCacheMonitor extends CypherCacheMonitor[Pair[String, ParameterTypeMap]]

/**
* This class constructs and initializes both the cypher compilers and runtimes, which are very expensive
Expand All @@ -44,7 +44,7 @@ trait StringCacheMonitor extends CypherCacheMonitor[Pair[String, MapValue]]
class ExecutionEngine(val queryService: GraphDatabaseQueryService,
val kernelMonitors: Monitors,
val tracer: CompilationTracer,
val cacheTracer: CacheTracer[Pair[String, MapValue]],
val cacheTracer: CacheTracer[Pair[String, ParameterTypeMap]],
val config: CypherConfiguration,
val compatibilityFactory: CompilerFactory,
val logProvider: LogProvider,
Expand All @@ -63,7 +63,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
// Log on stale query discard from query cache
private val log = logProvider.getLog( getClass )
kernelMonitors.addMonitorListener( new StringCacheMonitor {
override def cacheDiscard(ignored: Pair[String, MapValue], query: String, secondsSinceReplan: Int) {
override def cacheDiscard(ignored: Pair[String, ParameterTypeMap], query: String, secondsSinceReplan: Int) {
log.info(s"Discarded stale query from the query cache after ${secondsSinceReplan} seconds: $query")
}
})
Expand Down

0 comments on commit bffca84

Please sign in to comment.