Skip to content

Commit

Permalink
Started enhancing query caches with parameter information
Browse files Browse the repository at this point in the history
  • Loading branch information
SaschaPeukert committed Jul 25, 2018
1 parent 5868dec commit bcb2e42
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 38 deletions.
Expand Up @@ -21,7 +21,9 @@ package org.neo4j.cypher


import org.neo4j.cypher.internal.StringCacheMonitor import org.neo4j.cypher.internal.StringCacheMonitor
import org.neo4j.graphdb.Label import org.neo4j.graphdb.Label
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.util.test_helpers.CypherFunSuite import org.opencypher.v9_0.util.test_helpers.CypherFunSuite
import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.prop.TableDrivenPropertyChecks


Expand Down Expand Up @@ -92,15 +94,15 @@ class QueryCachingTest extends CypherFunSuite with GraphDatabaseTestSupport with
log += s"cacheFlushDetected" log += s"cacheFlushDetected"
} }


override def cacheHit(key: String): Unit = { override def cacheHit(key: Pair[String, MapValue]): Unit = {
log += s"cacheHit: $key" log += s"cacheHit: $key"
} }


override def cacheMiss(key: String): Unit = { override def cacheMiss(key: Pair[String, MapValue]): Unit = {
log += s"cacheMiss: $key" log += s"cacheMiss: $key"
} }


override def cacheDiscard(key: String, ignored: String, secondsSinceReplan: Int): Unit = { override def cacheDiscard(key: Pair[String, MapValue], ignored: String, secondsSinceReplan: Int): Unit = {
log += s"cacheDiscard: $key" log += s"cacheDiscard: $key"
} }
} }
Expand Down
Expand Up @@ -21,11 +21,13 @@


import org.neo4j.cypher.internal.CacheTracer; import org.neo4j.cypher.internal.CacheTracer;
import org.neo4j.cypher.internal.StringCacheMonitor; import org.neo4j.cypher.internal.StringCacheMonitor;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.values.virtual.MapValue;


/** /**
* Adapter for passing CacheTraces into the Monitoring infrastructure. * Adapter for passing CacheTraces into the Monitoring infrastructure.
*/ */
public class MonitoringCacheTracer implements CacheTracer<String> public class MonitoringCacheTracer implements CacheTracer<Pair<String,MapValue>>
{ {
private final StringCacheMonitor monitor; private final StringCacheMonitor monitor;


Expand All @@ -35,19 +37,19 @@ public MonitoringCacheTracer( StringCacheMonitor monitor )
} }


@Override @Override
public void queryCacheHit( String queryKey, String metaData ) public void queryCacheHit( Pair<String,MapValue> queryKey, String metaData )
{ {
monitor.cacheHit( queryKey ); monitor.cacheHit( queryKey );
} }


@Override @Override
public void queryCacheMiss( String queryKey, String metaData ) public void queryCacheMiss( Pair<String,MapValue> queryKey, String metaData )
{ {
monitor.cacheMiss( queryKey ); monitor.cacheMiss( queryKey );
} }


@Override @Override
public void queryCacheStale( String queryKey, int secondsSincePlan, String metaData ) public void queryCacheStale( Pair<String,MapValue> queryKey, int secondsSincePlan, String metaData )
{ {
monitor.cacheDiscard( queryKey, metaData, secondsSincePlan ); monitor.cacheDiscard( queryKey, metaData, secondsSincePlan );
} }
Expand Down
Expand Up @@ -22,12 +22,14 @@ package org.neo4j.cypher
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong


import org.neo4j.cypher.internal.StringCacheMonitor import org.neo4j.cypher.internal.StringCacheMonitor
import org.neo4j.helpers.collection.Pair
import org.neo4j.values.virtual.MapValue


class PlanCacheMetricsMonitor extends StringCacheMonitor { class PlanCacheMetricsMonitor extends StringCacheMonitor {
private val counter = new AtomicLong() private val counter = new AtomicLong()
private val waitTime = new AtomicLong() private val waitTime = new AtomicLong()


override def cacheDiscard(ignored1: String, ignored2: String, secondsSinceReplan: Int): Unit = { override def cacheDiscard(ignored1: Pair[String, MapValue], ignored2: String, secondsSinceReplan: Int): Unit = {
counter.incrementAndGet() counter.incrementAndGet()
waitTime.addAndGet(secondsSinceReplan) waitTime.addAndGet(secondsSinceReplan)
} }
Expand Down
Expand Up @@ -21,21 +21,21 @@ package org.neo4j.cypher.internal


import java.time.Clock import java.time.Clock


import org.neo4j.cypher.internal.compatibility.{CypherCacheMonitor, LFUCache} import org.neo4j.cypher.internal.compatibility.CypherCacheMonitor
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer import org.neo4j.cypher.internal.runtime.interpreted.LastCommittedTxIdProvider
import org.neo4j.cypher.internal.runtime.interpreted.{LastCommittedTxIdProvider, ValueConversion}
import org.neo4j.cypher.internal.tracing.CompilationTracer import org.neo4j.cypher.internal.tracing.CompilationTracer
import org.neo4j.cypher.internal.tracing.CompilationTracer.QueryCompilationEvent import org.neo4j.cypher.internal.tracing.CompilationTracer.QueryCompilationEvent
import org.neo4j.cypher.{ParameterNotFoundException, SyntaxException, exceptionHandler} import org.neo4j.cypher.{ParameterNotFoundException, exceptionHandler}
import org.neo4j.graphdb.Result import org.neo4j.graphdb.Result
import org.neo4j.helpers.collection.Pair
import org.neo4j.internal.kernel.api.security.AccessMode import org.neo4j.internal.kernel.api.security.AccessMode
import org.neo4j.kernel.GraphDatabaseQueryService import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.impl.query.{QueryExecution, ResultBuffer, TransactionalContext} import org.neo4j.kernel.impl.query.{QueryExecution, ResultBuffer, TransactionalContext}
import org.neo4j.kernel.monitoring.Monitors import org.neo4j.kernel.monitoring.Monitors
import org.neo4j.logging.LogProvider import org.neo4j.logging.LogProvider
import org.neo4j.values.virtual.MapValue import org.neo4j.values.virtual.MapValue


trait StringCacheMonitor extends CypherCacheMonitor[String] trait StringCacheMonitor extends CypherCacheMonitor[Pair[String, MapValue]]


/** /**
* This class constructs and initializes both the cypher compilers and runtimes, which are very expensive * This class constructs and initializes both the cypher compilers and runtimes, which are very expensive
Expand All @@ -44,7 +44,7 @@ trait StringCacheMonitor extends CypherCacheMonitor[String]
class ExecutionEngine(val queryService: GraphDatabaseQueryService, class ExecutionEngine(val queryService: GraphDatabaseQueryService,
val kernelMonitors: Monitors, val kernelMonitors: Monitors,
val tracer: CompilationTracer, val tracer: CompilationTracer,
val cacheTracer: CacheTracer[String], val cacheTracer: CacheTracer[Pair[String, MapValue]],
val config: CypherConfiguration, val config: CypherConfiguration,
val compatibilityFactory: CompilerFactory, val compatibilityFactory: CompilerFactory,
val logProvider: LogProvider, val logProvider: LogProvider,
Expand All @@ -63,7 +63,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
// Log on stale query discard from query cache // Log on stale query discard from query cache
private val log = logProvider.getLog( getClass ) private val log = logProvider.getLog( getClass )
kernelMonitors.addMonitorListener( new StringCacheMonitor { kernelMonitors.addMonitorListener( new StringCacheMonitor {
override def cacheDiscard(ignored: String, query: String, secondsSinceReplan: Int) { override def cacheDiscard(ignored: Pair[String, MapValue], query: String, secondsSinceReplan: Int) {
log.info(s"Discarded stale query from the query cache after ${secondsSinceReplan} seconds: $query") log.info(s"Discarded stale query from the query cache after ${secondsSinceReplan} seconds: $query")
} }
}) })
Expand All @@ -73,8 +73,8 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
config.statsDivergenceCalculator, config.statsDivergenceCalculator,
lastCommittedTxIdProvider, lastCommittedTxIdProvider,
planReusabilitiy) planReusabilitiy)
private val queryCache: QueryCache[String, ExecutableQuery] = private val queryCache: QueryCache[String,Pair[String, MapValue], ExecutableQuery] =
new QueryCache(config.queryCacheSize, planStalenessCaller, cacheTracer) new QueryCache[String, Pair[String, MapValue], ExecutableQuery](config.queryCacheSize, planStalenessCaller, cacheTracer)


private val masterCompiler: MasterCompiler = private val masterCompiler: MasterCompiler =
new MasterCompiler(queryService, kernelMonitors, config, logProvider, new CompilerLibrary(compatibilityFactory)) new MasterCompiler(queryService, kernelMonitors, config, logProvider, new CompilerLibrary(compatibilityFactory))
Expand All @@ -91,7 +91,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,


try { try {
val preParsedQuery = preParser.preParseQuery(query, profile) val preParsedQuery = preParser.preParseQuery(query, profile)
val executableQuery = getOrCompile(context, preParsedQuery, queryTracer) val executableQuery = getOrCompile(context, preParsedQuery, queryTracer, params)
if (preParsedQuery.executionMode.name != "explain") { if (preParsedQuery.executionMode.name != "explain") {
checkParameters(executableQuery.paramNames, params, executableQuery.extractedParams) checkParameters(executableQuery.paramNames, params, executableQuery.extractedParams)
} }
Expand All @@ -116,9 +116,10 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,


private def getOrCompile(context: TransactionalContext, private def getOrCompile(context: TransactionalContext,
preParsedQuery: PreParsedQuery, preParsedQuery: PreParsedQuery,
tracer: QueryCompilationEvent tracer: QueryCompilationEvent,
params: MapValue
): ExecutableQuery = { ): ExecutableQuery = {
val cacheKey = preParsedQuery.statementWithVersionAndPlanner val cacheKey = Pair.of(preParsedQuery.statementWithVersionAndPlanner, params)


// create transaction and query context // create transaction and query context
val tc = context.getOrBeginNewIfClosed() val tc = context.getOrBeginNewIfClosed()
Expand Down
Expand Up @@ -20,7 +20,9 @@
package org.neo4j.cypher.internal package org.neo4j.cypher.internal


import com.github.benmanes.caffeine.cache.{Cache, Caffeine} import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.query.TransactionalContext import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.values.virtual.MapValue


/** /**
* The result of one cache lookup. * The result of one cache lookup.
Expand Down Expand Up @@ -56,7 +58,7 @@ trait CacheTracer[QUERY_KEY] {
* @param stalenessCaller Decided whether CachedExecutionPlans are stale * @param stalenessCaller Decided whether CachedExecutionPlans are stale
* @param tracer Traces cache activity * @param tracer Traces cache activity
*/ */
class QueryCache[QUERY_KEY <: AnyRef, EXECUTABLE_QUERY <: AnyRef](val maximumSize: Int, class QueryCache[QUERY_REP <: AnyRef, QUERY_KEY <: Pair[QUERY_REP, MapValue], EXECUTABLE_QUERY <: AnyRef](val maximumSize: Int,
val stalenessCaller: PlanStalenessCaller[EXECUTABLE_QUERY], val stalenessCaller: PlanStalenessCaller[EXECUTABLE_QUERY],
val tracer: CacheTracer[QUERY_KEY]) { val tracer: CacheTracer[QUERY_KEY]) {


Expand Down
Expand Up @@ -27,7 +27,7 @@ import org.neo4j.kernel.impl.query.TransactionalContext


case class SchemaToken(x: Long) extends AnyVal case class SchemaToken(x: Long) extends AnyVal


class SchemaHelper(val queryCache: QueryCache[_,_]) { class SchemaHelper(val queryCache: QueryCache[_,_,_]) {


private val schemaToken = new AtomicLong() private val schemaToken = new AtomicLong()


Expand Down
Expand Up @@ -21,9 +21,11 @@ package org.neo4j.cypher.internal.compatibility


import java.time.Clock import java.time.Clock


import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v3_5.StatsDivergenceCalculator import org.neo4j.cypher.internal.compiler.v3_5.StatsDivergenceCalculator
import org.neo4j.cypher.internal.compiler.v3_5.phases.LogicalPlanState import org.neo4j.cypher.internal.compiler.v3_5.phases.LogicalPlanState
import org.neo4j.cypher.internal._ import org.neo4j.helpers.collection.Pair
import org.neo4j.values.virtual.MapValue


/** /**
* Cache which stores logical plans indexed by an AST statement. * Cache which stores logical plans indexed by an AST statement.
Expand All @@ -36,14 +38,13 @@ import org.neo4j.cypher.internal._
* @tparam STATEMENT Type of AST statement used as key * @tparam STATEMENT Type of AST statement used as key
*/ */
class AstLogicalPlanCache[STATEMENT <: AnyRef](override val maximumSize: Int, class AstLogicalPlanCache[STATEMENT <: AnyRef](override val maximumSize: Int,
override val tracer: CacheTracer[STATEMENT], override val tracer: CacheTracer[Pair[STATEMENT, MapValue]],
clock: Clock, clock: Clock,
divergence: StatsDivergenceCalculator, divergence: StatsDivergenceCalculator,
lastCommittedTxIdProvider: () => Long lastCommittedTxIdProvider: () => Long
) extends QueryCache[STATEMENT, CacheableLogicalPlan](maximumSize, ) extends QueryCache[STATEMENT,Pair[STATEMENT,MapValue], CacheableLogicalPlan](maximumSize,
AstLogicalPlanCache.stalenessCaller(clock, divergence, lastCommittedTxIdProvider), AstLogicalPlanCache.stalenessCaller(clock, divergence, lastCommittedTxIdProvider),
tracer) tracer)

object AstLogicalPlanCache { object AstLogicalPlanCache {
def stalenessCaller(clock: Clock, def stalenessCaller(clock: Clock,
divergence: StatsDivergenceCalculator, divergence: StatsDivergenceCalculator,
Expand Down
Expand Up @@ -26,8 +26,10 @@ import org.neo4j.cypher.internal.compatibility.v3_5.{WrappedMonitors => WrappedM
import org.neo4j.cypher.internal.compiler.v3_5._ import org.neo4j.cypher.internal.compiler.v3_5._
import org.neo4j.cypher.internal.compiler.v3_5.phases.LogicalPlanState import org.neo4j.cypher.internal.compiler.v3_5.phases.LogicalPlanState
import org.neo4j.cypher.internal.planner.v3_5.spi.PlanContext import org.neo4j.cypher.internal.planner.v3_5.spi.PlanContext
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors} import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.Log import org.neo4j.logging.Log
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.frontend.phases._ import org.opencypher.v9_0.frontend.phases._


/** /**
Expand All @@ -47,7 +49,7 @@ abstract class BasePlanner[STATEMENT <: AnyRef, PARSED_STATE <: AnyRef](
protected val logger: InfoLogger = new StringInfoLogger(log) protected val logger: InfoLogger = new StringInfoLogger(log)
protected val monitors: Monitors = WrappedMonitorsv3_5(kernelMonitors) protected val monitors: Monitors = WrappedMonitorsv3_5(kernelMonitors)


protected val cacheTracer: CacheTracer[STATEMENT] = monitors.newMonitor[CacheTracer[STATEMENT]]("cypher3.5") protected val cacheTracer: CacheTracer[Pair[STATEMENT, MapValue]] = monitors.newMonitor[CacheTracer[Pair[STATEMENT, MapValue]]]("cypher3.5")


override def parserCacheSize: Int = config.queryCacheSize override def parserCacheSize: Int = config.queryCacheSize


Expand Down
Expand Up @@ -47,6 +47,7 @@ import org.neo4j.cypher.internal.util.{v3_4 => utilV3_4}
import org.neo4j.cypher.internal.v3_4.expressions.{Expression, Parameter} import org.neo4j.cypher.internal.v3_4.expressions.{Expression, Parameter}
import org.neo4j.cypher.{CypherPlannerOption, CypherUpdateStrategy, CypherVersion} import org.neo4j.cypher.{CypherPlannerOption, CypherUpdateStrategy, CypherVersion}
import org.neo4j.graphdb.Notification import org.neo4j.graphdb.Notification
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.query.TransactionalContext import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors} import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.Log import org.neo4j.logging.Log
Expand Down Expand Up @@ -205,9 +206,10 @@ case class Cypher34Planner(configv3_5: CypherPlannerConfiguration,
CacheableLogicalPlan(logicalPlanStatev3_5, reusabilityState) CacheableLogicalPlan(logicalPlanStatev3_5, reusabilityState)
} }


val params = ValueConversion.asValues(preparedQuery.extractedParams())
val cacheableLogicalPlan = val cacheableLogicalPlan =
if (preParsedQuery.debugOptions.isEmpty) if (preParsedQuery.debugOptions.isEmpty)
planCache.computeIfAbsentOrStale(syntacticQuery.statement(), planCache.computeIfAbsentOrStale(Pair.of(syntacticQuery.statement(), params),
transactionalContext, transactionalContext,
createPlan, createPlan,
syntacticQuery.queryText).executableQuery syntacticQuery.queryText).executableQuery
Expand All @@ -220,7 +222,7 @@ case class Cypher34Planner(configv3_5: CypherPlannerConfiguration,
LogicalPlanResult( LogicalPlanResult(
cacheableLogicalPlan.logicalPlanState, cacheableLogicalPlan.logicalPlanState,
queryParamNames, queryParamNames,
ValueConversion.asValues(preparedQuery.extractedParams()), params,
cacheableLogicalPlan.reusability, cacheableLogicalPlan.reusability,
contextv3_5) contextv3_5)
} }
Expand Down
Expand Up @@ -34,6 +34,7 @@ import org.neo4j.cypher.internal.compiler.v3_5.planner.logical.{CachedMetricsFac
import org.neo4j.cypher.internal.planner.v3_5.spi.{CostBasedPlannerName, DPPlannerName, IDPPlannerName, PlanContext} import org.neo4j.cypher.internal.planner.v3_5.spi.{CostBasedPlannerName, DPPlannerName, IDPPlannerName, PlanContext}
import org.neo4j.cypher.internal.runtime.interpreted._ import org.neo4j.cypher.internal.runtime.interpreted._
import org.neo4j.graphdb.Notification import org.neo4j.graphdb.Notification
import org.neo4j.helpers.collection.Pair
import org.neo4j.kernel.impl.query.TransactionalContext import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors} import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.Log import org.neo4j.logging.Log
Expand Down Expand Up @@ -152,9 +153,10 @@ case class Cypher35Planner(config: CypherPlannerConfiguration,
CacheableLogicalPlan(logicalPlanState, reusabilityState) CacheableLogicalPlan(logicalPlanState, reusabilityState)
} }


val params= ValueConversion.asValues(preparedQuery.extractedParams())
val cacheableLogicalPlan = val cacheableLogicalPlan =
if (preParsedQuery.debugOptions.isEmpty) if (preParsedQuery.debugOptions.isEmpty)
planCache.computeIfAbsentOrStale(syntacticQuery.statement(), planCache.computeIfAbsentOrStale(Pair.of(syntacticQuery.statement(), params),
transactionalContext, transactionalContext,
createPlan, createPlan,
syntacticQuery.queryText).executableQuery syntacticQuery.queryText).executableQuery
Expand All @@ -164,7 +166,7 @@ case class Cypher35Planner(config: CypherPlannerConfiguration,
LogicalPlanResult( LogicalPlanResult(
cacheableLogicalPlan.logicalPlanState, cacheableLogicalPlan.logicalPlanState,
queryParamNames, queryParamNames,
ValueConversion.asValues(preparedQuery.extractedParams()), params,
cacheableLogicalPlan.reusability, cacheableLogicalPlan.reusability,
context) context)
} }
Expand Down
Expand Up @@ -32,13 +32,13 @@
import java.time.temporal.TemporalAmount; import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit; import java.time.temporal.TemporalUnit;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;


import org.neo4j.graphdb.spatial.CRS; import org.neo4j.graphdb.spatial.CRS;
import org.neo4j.graphdb.spatial.Point; import org.neo4j.graphdb.spatial.Point;
import org.neo4j.values.TernaryComparator; import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.MapValue;


import static java.lang.String.format; import static java.lang.String.format;
import static org.neo4j.values.storable.DateTimeValue.datetime; import static org.neo4j.values.storable.DateTimeValue.datetime;
Expand Down Expand Up @@ -750,4 +750,23 @@ public static Value maxValue( ValueGroup valueGroup, Value value )
format( "The maxValue for valueGroup %s is not defined yet", valueGroup ) ); format( "The maxValue for valueGroup %s is not defined yet", valueGroup ) );
} }
} }

public static boolean mapsEqualsOnValueTypes( MapValue one, MapValue other )
{
if ( other.size() != one.size() )
{
return false;
}

for ( String key : one.keySet() )
{
AnyValue oneValue = one.get( key );
AnyValue otherValue = other.get( key );
if ( !(oneValue.getClass() == otherValue.getClass()) )
{
return false;
}
}
return true;
}
} }

0 comments on commit bcb2e42

Please sign in to comment.