Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.3' into 3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Nov 23, 2017
2 parents fe50754 + f2f9a3c commit 5c6d92c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.neo4j.cypher.internal.runtime.interpreted.UpdateCountingQueryContext
import org.neo4j.cypher.internal.runtime.interpreted.commands.convert.{CommunityExpressionConverter, ExpressionConverters}
import org.neo4j.cypher.internal.runtime.interpreted.pipes.Pipe
import org.neo4j.cypher.internal.runtime.{ExecutionMode, InternalExecutionResult, ProfileMode, QueryContext}
import org.neo4j.cypher.internal.v3_4.logical.plans.IndexUsage
import org.neo4j.cypher.internal.v3_4.logical.plans.{IndexUsage, LogicalPlan}
import org.neo4j.values.virtual.MapValue

object BuildInterpretedExecutionPlan extends Phase[CommunityRuntimeContext, LogicalPlanState, CompilationState] {
Expand All @@ -56,24 +56,14 @@ object BuildInterpretedExecutionPlan extends Phase[CommunityRuntimeContext, Logi
val resultBuilderFactory = DefaultExecutionResultBuilderFactory(pipeInfo, columns, logicalPlan)
val func = getExecutionPlanFunction(periodicCommitInfo, from.queryText, updating, resultBuilderFactory,
context.notificationLogger, InterpretedRuntimeName)
val execPlan = new ExecutionPlan {
private val fingerprint = context.createFingerprintReference(fp)

override def run(queryContext: QueryContext, planType: ExecutionMode, params: MapValue): InternalExecutionResult =
func(queryContext, planType, params)

override def isPeriodicCommit: Boolean = periodicCommitInfo.isDefined

override def plannerUsed: PlannerName = planner

override def isStale(lastTxId: () => Long, statistics: GraphStatistics): Boolean = fingerprint.isStale(lastTxId, statistics)

override def runtimeUsed = InterpretedRuntimeName

override def notifications(planContext: PlanContext): Seq[InternalNotification] = checkForNotifications(pipe, planContext, context.config)

override def plannedIndexUsage: Seq[IndexUsage] = logicalPlan.indexUsage
}
val execPlan: ExecutionPlan = new InterpretedExecutionPlan(func,
logicalPlan,
pipe,
periodicCommitInfo.isDefined,
planner,
context.createFingerprintReference(fp),
context.config)

new CompilationState(from, Some(execPlan))
}
Expand Down Expand Up @@ -111,4 +101,28 @@ object BuildInterpretedExecutionPlan extends Phase[CommunityRuntimeContext, Logi

builder.build(queryId, planType, params, notificationLogger, runtimeName)
}

/**
* Executable plan for a single cypher query. Warning, this class will get cached! Do not leak transaction objects
* or other resources in here.
*/
class InterpretedExecutionPlan(val executionPlanFunc: (QueryContext, ExecutionMode, MapValue) => InternalExecutionResult,
val logicalPlan: LogicalPlan,
val pipe: Pipe,
override val isPeriodicCommit: Boolean,
override val plannerUsed: PlannerName,
val fingerprint: PlanFingerprintReference,
val config: CypherCompilerConfiguration) extends ExecutionPlan {

override def run(queryContext: QueryContext, planType: ExecutionMode, params: MapValue): InternalExecutionResult =
executionPlanFunc(queryContext, planType, params)

override def isStale(lastTxId: () => Long, statistics: GraphStatistics): Boolean = fingerprint.isStale(lastTxId, statistics)

override def runtimeUsed: RuntimeName = InterpretedRuntimeName

override def notifications(planContext: PlanContext): Seq[InternalNotification] = checkForNotifications(pipe, planContext, config)

override def plannedIndexUsage: Seq[IndexUsage] = logicalPlan.indexUsage
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.neo4j.cypher.internal.runtime.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.util.v3_4.TaskCloser
import org.neo4j.cypher.internal.v3_4.codegen.profiling.ProfilingTracer
import org.neo4j.cypher.internal.v3_4.logical.plans.IndexUsage
import org.neo4j.graphdb.Notification
import org.neo4j.values.virtual.MapValue

object BuildCompiledExecutionPlan extends Phase[EnterpriseRuntimeContext, LogicalPlanState, CompilationState] {
Expand All @@ -46,14 +47,17 @@ object BuildCompiledExecutionPlan extends Phase[EnterpriseRuntimeContext, Logica

override def description = "creates runnable byte code"

override def postConditions = Set.empty// Can't yet guarantee that we can build an execution plan
override def postConditions = Set.empty // Can't yet guarantee that we can build an execution plan

override def process(from: LogicalPlanState, context: EnterpriseRuntimeContext): CompilationState = {
val runtimeSuccessRateMonitor = context.monitors.newMonitor[NewRuntimeSuccessRateMonitor]()
try {
val codeGen = new CodeGenerator(context.codeStructure, context.clock, CodeGenConfiguration(context.debugOptions))
val compiled: CompiledPlan = codeGen.generate(from.logicalPlan, context.planContext, from.semanticTable(), from.plannerName)
val executionPlan: ExecutionPlan = createExecutionPlan(context, compiled)
val executionPlan: ExecutionPlan =
new CompiledExecutionPlan(compiled,
context.createFingerprintReference(compiled.fingerprint),
notifications(context))
runtimeSuccessRateMonitor.newPlanSeen(from.logicalPlan)
new CompilationState(from, Some(executionPlan))
} catch {
Expand All @@ -63,10 +67,37 @@ object BuildCompiledExecutionPlan extends Phase[EnterpriseRuntimeContext, Logica
}
}

private def createExecutionPlan(context: EnterpriseRuntimeContext, compiled: CompiledPlan) = new ExecutionPlan {
private val fingerprint = context.createFingerprintReference(compiled.fingerprint)
private def notifications(context: EnterpriseRuntimeContext): Set[Notification] = {
val mapper = asKernelNotification(context.notificationLogger.offset) _
context.notificationLogger.notifications.map(mapper)
}
private def createTracer(mode: ExecutionMode, queryContext: QueryContext): DescriptionProvider = mode match {
case ProfileMode =>
val tracer = new ProfilingTracer(queryContext.transactionalContext.kernelStatisticProvider)
(description: Provider[InternalPlanDescription]) =>
(new Provider[InternalPlanDescription] {

override def isStale(lastTxId: () => Long, statistics: GraphStatistics): Boolean = fingerprint.isStale(lastTxId, statistics)
override def get(): InternalPlanDescription = description.get().map {
plan: InternalPlanDescription =>
val data = tracer.get(plan.id)
plan.
addArgument(Arguments.DbHits(data.dbHits())).
addArgument(Arguments.PageCacheHits(data.pageCacheHits())).
addArgument(Arguments.PageCacheMisses(data.pageCacheMisses())).
addArgument(Arguments.PageCacheHitRatio(data.pageCacheHitRatio())).
addArgument(Arguments.Rows(data.rows())).
addArgument(Arguments.Time(data.time()))
}
}, Some(tracer))
case _ => (description: Provider[InternalPlanDescription]) => (description, None)
}

/**
* Execution plan for compiled runtime. Beware: will be cached.
*/
class CompiledExecutionPlan(val compiled: CompiledPlan,
val fingerprint: PlanFingerprintReference,
val notifications: Set[Notification]) extends ExecutionPlan {

override def run(queryContext: QueryContext,
executionMode: ExecutionMode, params: MapValue): InternalExecutionResult = {
Expand All @@ -76,48 +107,26 @@ object BuildCompiledExecutionPlan extends Phase[EnterpriseRuntimeContext, Logica
if (executionMode == ExplainMode) {
//close all statements
taskCloser.close(success = true)
val logger = context.notificationLogger
ExplainExecutionResult(compiled.columns.toArray,
compiled.planDescription.get(), READ_ONLY, logger.notifications.map(asKernelNotification(logger.offset)))
ExplainExecutionResult(compiled.columns.toArray, compiled.planDescription.get(), READ_ONLY, notifications)
} else
compiled.executionResultBuilder(queryContext, executionMode, createTracer(executionMode, queryContext),
params, taskCloser)
compiled.executionResultBuilder(queryContext, executionMode, createTracer(executionMode, queryContext), params, taskCloser)
} catch {
case (t: Throwable) =>
taskCloser.close(success = false)
throw t
}
}

override def plannerUsed: PlannerName = compiled.plannerUsed

override def isPeriodicCommit: Boolean = compiled.periodicCommit.isDefined
override def isStale(lastTxId: () => Long, statistics: GraphStatistics): Boolean = fingerprint.isStale(lastTxId, statistics)

override def runtimeUsed = CompiledRuntimeName
override def runtimeUsed: RuntimeName = CompiledRuntimeName

override def notifications(planContext: PlanContext): Seq[InternalNotification] = Seq.empty

override def plannedIndexUsage: Seq[IndexUsage] = compiled.plannedIndexUsage
}

private def createTracer(mode: ExecutionMode, queryContext: QueryContext): DescriptionProvider = mode match {
case ProfileMode =>
val tracer = new ProfilingTracer(queryContext.transactionalContext.kernelStatisticProvider)
(description: Provider[InternalPlanDescription]) =>
(new Provider[InternalPlanDescription] {
override def isPeriodicCommit: Boolean = compiled.periodicCommit.isDefined

override def get(): InternalPlanDescription = description.get().map {
plan: InternalPlanDescription =>
val data = tracer.get(plan.id)
plan.
addArgument(Arguments.DbHits(data.dbHits())).
addArgument(Arguments.PageCacheHits(data.pageCacheHits())).
addArgument(Arguments.PageCacheMisses(data.pageCacheMisses())).
addArgument(Arguments.PageCacheHitRatio(data.pageCacheHitRatio())).
addArgument(Arguments.Rows(data.rows())).
addArgument(Arguments.Time(data.time()))
}
}, Some(tracer))
case _ => (description: Provider[InternalPlanDescription]) => (description, None)
override def plannerUsed: PlannerName = compiled.plannerUsed
}
}

0 comments on commit 5c6d92c

Please sign in to comment.