Skip to content

Commit

Permalink
Code cleanups and ignore unused parameter types
Browse files Browse the repository at this point in the history
  • Loading branch information
sherfert authored and SaschaPeukert committed Jul 25, 2018
1 parent bffca84 commit b9f4781
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 208 deletions.
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.cypher.internal.ParameterTypeMap;
import org.neo4j.cypher.internal.compatibility.CypherCacheHitMonitor;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Result;
Expand Down Expand Up @@ -188,27 +187,27 @@ private static int randomInt( int max )
return ThreadLocalRandom.current().nextInt( max );
}

private static class TestMonitor implements CypherCacheHitMonitor<Pair<String,ParameterTypeMap>>
private static class TestMonitor implements CypherCacheHitMonitor<Pair<String,scala.collection.immutable.Map<String, Class<?>>>>
{
private final AtomicInteger hits = new AtomicInteger();
private final AtomicInteger misses = new AtomicInteger();
private final AtomicInteger discards = new AtomicInteger();
private final AtomicLong waitTime = new AtomicLong();

@Override
public void cacheHit( Pair<String,ParameterTypeMap> key )
public void cacheHit( Pair<String,scala.collection.immutable.Map<String, Class<?>>> key )
{
hits.incrementAndGet();
}

@Override
public void cacheMiss( Pair<String,ParameterTypeMap> key )
public void cacheMiss( Pair<String,scala.collection.immutable.Map<String, Class<?>>> key )
{
misses.incrementAndGet();
}

@Override
public void cacheDiscard( Pair<String,ParameterTypeMap> key, String ignored, int secondsSinceReplan )
public void cacheDiscard( Pair<String,scala.collection.immutable.Map<String, Class<?>>> key, String ignored, int secondsSinceReplan )
{
discards.incrementAndGet();
waitTime.addAndGet( secondsSinceReplan );
Expand Down
Expand Up @@ -19,7 +19,8 @@
*/
package org.neo4j.cypher

import org.neo4j.cypher.internal.{ParameterTypeMap, StringCacheMonitor}
import org.neo4j.cypher.internal.QueryCache.ParameterTypeMap
import org.neo4j.cypher.internal.StringCacheMonitor
import org.neo4j.graphdb.Label
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
Expand Down Expand Up @@ -145,7 +146,6 @@ class QueryCachingTest extends CypherFunSuite with GraphDatabaseTestSupport with
s"cacheHit: (CYPHER 3.5 $query, Map(n -> class org.neo4j.values.storable.StringWrappingStringValue))")

actual should equal(expected)

}

private class LoggingStringCacheListener extends StringCacheMonitor {
Expand Down
Expand Up @@ -23,13 +23,16 @@ import java.time.{Clock, Instant, ZoneOffset}

import org.neo4j.cypher
import org.neo4j.cypher._
import org.neo4j.cypher.internal.compatibility.{CommunityRuntimeContextCreator, CypherCurrentCompiler, RuntimeContext}
import org.neo4j.cypher.internal.QueryCache.ParameterTypeMap
import org.neo4j.cypher.internal.compatibility.v3_4.Cypher34Planner
import org.neo4j.cypher.internal.compatibility.{CommunityRuntimeContextCreator, CypherCurrentCompiler, CypherPlanner, RuntimeContext}
import org.neo4j.cypher.internal.compiler.v3_5.{CypherPlannerConfiguration, StatsDivergenceCalculator}
import org.neo4j.cypher.internal.runtime.interpreted.CSVResources
import org.neo4j.cypher.internal.{CacheTracer, CommunityRuntimeFactory, ParameterTypeMap, PreParsedQuery}
import org.neo4j.cypher.internal.{CacheTracer, CommunityRuntimeFactory, PreParsedQuery}
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.util.ValueUtils
import org.neo4j.logging.AssertableLogProvider.inLog
import org.neo4j.logging.{AssertableLogProvider, Log, NullLog}
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer
Expand All @@ -43,7 +46,6 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData
def createCompiler(queryCacheSize: Int = 128, statsDivergenceThreshold: Double = 0.5, queryPlanTTL: Long = 1000,
clock: Clock = Clock.systemUTC(), log: Log = NullLog.getInstance):
CypherCurrentCompiler[RuntimeContext] = {

val config = CypherPlannerConfiguration(
queryCacheSize,
StatsDivergenceCalculator.divergenceNoDecayCalculator(statsDivergenceThreshold, queryPlanTTL),
Expand All @@ -57,14 +59,20 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData
nonIndexedLabelWarningThreshold = 10000L,
planWithMinimumCardinalityEstimates = true
)
val planner = Cypher35Planner(config,
clock,
kernelMonitors,
log,
cypher.CypherPlannerOption.default,
CypherUpdateStrategy.default,
() => 1)
createCompiler(planner, config)
}

def createCompiler(planner: CypherPlanner, config: CypherPlannerConfiguration):
CypherCurrentCompiler[RuntimeContext] = {
CypherCurrentCompiler(
Cypher35Planner(config,
clock,
kernelMonitors,
log,
cypher.CypherPlannerOption.default,
CypherUpdateStrategy.default,
() => 1),
planner,
CommunityRuntimeFactory.getRuntime(CypherRuntimeOption.default, disallowFallback = true),
CommunityRuntimeContextCreator,
kernelMonitors)
Expand All @@ -75,20 +83,20 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData
override def toString = s"hits = $hits, misses = $misses, flushes = $flushes, evicted = $evicted"
}

class CacheCounter(var counts: CacheCounts = CacheCounts()) extends CacheTracer[Pair[AnyRef,ParameterTypeMap]] {
override def queryCacheHit(key: Pair[AnyRef,ParameterTypeMap], metaData: String) {
class CacheCounter(var counts: CacheCounts = CacheCounts()) extends CacheTracer[Pair[AnyRef, ParameterTypeMap]] {
override def queryCacheHit(key: Pair[AnyRef, ParameterTypeMap], metaData: String) {
counts = counts.copy(hits = counts.hits + 1)
}

override def queryCacheMiss(key: Pair[AnyRef,ParameterTypeMap], metaData: String) {
override def queryCacheMiss(key: Pair[AnyRef, ParameterTypeMap], metaData: String) {
counts = counts.copy(misses = counts.misses + 1)
}

override def queryCacheFlush(sizeBeforeFlush: Long) {
counts = counts.copy(flushes = counts.flushes + 1)
}

override def queryCacheStale(key: Pair[AnyRef,ParameterTypeMap], secondsSincePlan: Int, metaData: String): Unit = {
override def queryCacheStale(key: Pair[AnyRef, ParameterTypeMap], secondsSincePlan: Int, metaData: String): Unit = {
counts = counts.copy(evicted = counts.evicted + 1)
}
}
Expand All @@ -97,27 +105,55 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData

var counter: CacheCounter = _
var compiler: CypherCurrentCompiler[RuntimeContext] = _
var compiler3_4: CypherCurrentCompiler[RuntimeContext] = _

override protected def beforeEach(): Unit = {
super.beforeEach()
counter = new CacheCounter()
compiler = createCompiler()
compiler.kernelMonitors.addMonitorListener(counter)

val config3_4 = CypherPlannerConfiguration(
128,
StatsDivergenceCalculator.divergenceNoDecayCalculator(0.5, 1000),
useErrorsOverWarnings = false,
idpMaxTableSize = 128,
idpIterationDuration = 1000,
errorIfShortestPathFallbackUsedAtRuntime = false,
errorIfShortestPathHasCommonNodesAtRuntime = true,
legacyCsvQuoteEscaping = false,
csvBufferSize = CSVResources.DEFAULT_BUFFER_SIZE,
nonIndexedLabelWarningThreshold = 10000L,
planWithMinimumCardinalityEstimates = true
)
val planner3_4 = Cypher34Planner(config3_4,
Clock.systemUTC(),
kernelMonitors,
NullLog.getInstance,
cypher.CypherPlannerOption.default,
CypherUpdateStrategy.default,
() => 1)

compiler3_4 = createCompiler(planner3_4, config3_4)

kernelMonitors.addMonitorListener(counter)

}

private def runQuery(query: String, debugOptions: Set[String] = Set.empty, params: scala.Predef.Map[String, Any] = Map.empty): Unit = {
private def runQuery(query: String, debugOptions: Set[String] = Set.empty, params: scala.Predef.Map[String, AnyRef] = Map.empty, compiler: CypherCurrentCompiler[RuntimeContext] = compiler): Unit = {
import collection.JavaConverters._

graph.withTx { tx =>
val noTracing = CompilationPhaseTracer.NO_TRACING
val context = graph.transactionalContext(query = query -> params)
compiler.compile(PreParsedQuery(query, DummyPosition(0), query,
isPeriodicCommit = false,
CypherVersion.default,
CypherExecutionMode.default,
CypherPlannerOption.default,
CypherRuntimeOption.default,
CypherUpdateStrategy.default,
debugOptions),
noTracing, Set.empty, context)
isPeriodicCommit = false,
CypherVersion.default,
CypherExecutionMode.default,
CypherPlannerOption.default,
CypherRuntimeOption.default,
CypherUpdateStrategy.default,
debugOptions),
noTracing, Set.empty, context, ValueUtils.asParameterMapValue(params.asJava))
context.close(true)
}
}
Expand Down Expand Up @@ -250,7 +286,7 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData

// then
logProvider.assertExactly(
inLog(logName).info( s"Discarded stale query from the query cache after 0 seconds: $query" )
inLog(logName).info(s"Discarded stale query from the query cache after 0 seconds: $query")
)
}

Expand All @@ -262,11 +298,38 @@ class CypherCompilerAstCacheAcceptanceTest extends CypherFunSuite with GraphData
}

test("should not find query in cache with different parameter types") {
val map1: scala.Predef.Map[String, Any] = scala.Predef.Map("number" -> 42)
val map2: scala.Predef.Map[String, Any] = scala.Predef.Map("number" -> "nope")
val map1: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> new Integer(42))
val map2: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> "nope")
runQuery("return $number", params = map1)
runQuery("return $number", params = map2)

counter.counts should equal(CacheCounts(hits = 0, misses = 2, flushes = 1))
}

test("should find query in cache with different parameter types in 3.4") {
val map1: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> new Integer(42))
val map2: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> "nope")
runQuery("return $number", params = map1, compiler = compiler3_4)
runQuery("return $number", params = map2, compiler = compiler3_4)

counter.counts should equal(CacheCounts(hits = 1, misses = 1, flushes = 1))
}

test("should find query in cache with same parameter types") {
val map1: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> new Integer(42))
val map2: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> new Integer(43))
runQuery("return $number", params = map1)
runQuery("return $number", params = map2)

counter.counts should equal(CacheCounts(hits = 1, misses = 1, flushes = 1))
}

test("should find query in cache with same parameter types, ignoring unused parameters") {
val map1: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> new Integer(42), "foo" -> "bar")
val map2: scala.Predef.Map[String, AnyRef] = scala.Predef.Map("number" -> new Integer(43), "bar" -> new Integer(10))
runQuery("return $number", params = map1)
runQuery("return $number", params = map2)

counter.counts should equal(CacheCounts(hits = 1, misses = 1, flushes = 1))
}
}
Expand Up @@ -20,14 +20,13 @@
package org.neo4j.cypher.internal.javacompat;

import org.neo4j.cypher.internal.CacheTracer;
import org.neo4j.cypher.internal.ParameterTypeMap;
import org.neo4j.cypher.internal.StringCacheMonitor;
import org.neo4j.helpers.collection.Pair;

/**
* Adapter for passing CacheTraces into the Monitoring infrastructure.
*/
public class MonitoringCacheTracer implements CacheTracer<Pair<String,ParameterTypeMap>>
public class MonitoringCacheTracer implements CacheTracer<Pair<String,scala.collection.immutable.Map<String, Class<?>>>>
{
private final StringCacheMonitor monitor;

Expand All @@ -37,19 +36,19 @@ public MonitoringCacheTracer( StringCacheMonitor monitor )
}

@Override
public void queryCacheHit( Pair<String,ParameterTypeMap> queryKey, String metaData )
public void queryCacheHit( Pair<String,scala.collection.immutable.Map<String, Class<?>>> queryKey, String metaData )
{
monitor.cacheHit( queryKey );
}

@Override
public void queryCacheMiss( Pair<String,ParameterTypeMap> queryKey, String metaData )
public void queryCacheMiss( Pair<String,scala.collection.immutable.Map<String, Class<?>>> queryKey, String metaData )
{
monitor.cacheMiss( queryKey );
}

@Override
public void queryCacheStale( Pair<String,ParameterTypeMap> queryKey, int secondsSincePlan, String metaData )
public void queryCacheStale( Pair<String,scala.collection.immutable.Map<String, Class<?>>> queryKey, int secondsSincePlan, String metaData )
{
monitor.cacheDiscard( queryKey, metaData, secondsSincePlan );
}
Expand Down
Expand Up @@ -21,7 +21,8 @@ package org.neo4j.cypher

import java.util.concurrent.atomic.AtomicLong

import org.neo4j.cypher.internal.{ParameterTypeMap, StringCacheMonitor}
import org.neo4j.cypher.internal.QueryCache.ParameterTypeMap
import org.neo4j.cypher.internal.StringCacheMonitor
import org.neo4j.helpers.collection.Pair

class PlanCacheMetricsMonitor extends StringCacheMonitor {
Expand Down
Expand Up @@ -21,6 +21,7 @@ package org.neo4j.cypher.internal

import org.neo4j.cypher.CypherException
import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer

/**
Expand All @@ -42,6 +43,7 @@ trait Compiler {
def compile(preParsedQuery: PreParsedQuery,
tracer: CompilationPhaseTracer,
preParsingNotifications: Set[org.neo4j.graphdb.Notification],
transactionalContext: TransactionalContext
transactionalContext: TransactionalContext,
params: MapValue
): ExecutableQuery
}
Expand Up @@ -21,6 +21,7 @@ package org.neo4j.cypher.internal

import java.time.Clock

import org.neo4j.cypher.internal.QueryCache.ParameterTypeMap
import org.neo4j.cypher.internal.compatibility.CypherCacheMonitor
import org.neo4j.cypher.internal.runtime.interpreted.LastCommittedTxIdProvider
import org.neo4j.cypher.internal.tracing.CompilationTracer
Expand Down Expand Up @@ -73,8 +74,8 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
config.statsDivergenceCalculator,
lastCommittedTxIdProvider,
planReusabilitiy)
private val queryCache: QueryCache[String,Pair[String, MapValue], ExecutableQuery] =
new QueryCache[String, Pair[String, MapValue], ExecutableQuery](config.queryCacheSize, planStalenessCaller, cacheTracer)
private val queryCache: QueryCache[String,Pair[String, ParameterTypeMap], ExecutableQuery] =
new QueryCache[String, Pair[String, ParameterTypeMap], ExecutableQuery](config.queryCacheSize, planStalenessCaller, cacheTracer)

private val masterCompiler: MasterCompiler =
new MasterCompiler(queryService, kernelMonitors, config, logProvider, new CompilerLibrary(compatibilityFactory))
Expand Down Expand Up @@ -119,7 +120,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
tracer: QueryCompilationEvent,
params: MapValue
): ExecutableQuery = {
val cacheKey = Pair.of(preParsedQuery.statementWithVersionAndPlanner, params)
val cacheKey = Pair.of(preParsedQuery.statementWithVersionAndPlanner, QueryCache.extractParameterTypeMap(params))

// create transaction and query context
val tc = context.getOrBeginNewIfClosed()
Expand All @@ -132,7 +133,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
val schemaToken = schemaHelper.readSchemaToken(tc)
val cacheLookup = queryCache.computeIfAbsentOrStale(cacheKey,
tc,
() => masterCompiler.compile(preParsedQuery, tracer, tc),
() => masterCompiler.compile(preParsedQuery, tracer, tc, params),
preParsedQuery.rawStatement)
cacheLookup match {
case _: CacheHit[_] |
Expand Down

0 comments on commit b9f4781

Please sign in to comment.