Skip to content

Commit

Permalink
Large refactor of execution result
Browse files Browse the repository at this point in the history
PLAN DESCRIPTIONS:

Plan descriptions are now generated directly from the logical plan,
instead of being based on the physical operators. This makes the
plan descriptions for all runtimes be the same, appart from any added
arguments.

To enable profiling, runtimes now produce a `QueryProfile` if the plan
is executed with profiling turned on. This query profile is then handed
to the plan description builder to enrich the plan description.

INTERNAL EXECUTION RESULT:

The InternalExecutionResult has been cleaned of test-only code - that
functionality now lives in RewindableExecutionResult. Also, things have
changed so that StandardInternadExecutionResult now is it's on class
responsible for execution meta data and policies around materializing
results early.

ClosingExecutionResult is responsible for all closing of the execution
results due to end-of-rows or exceptions.

Runtimes implement a much slimmer RuntimeResult instead of
InternalExecutionResult. This only holds the primary functionality
related to getting results and profile information. Notably no runtime
is any longer doing any custom closing, results materialization, or
value conversion.
  • Loading branch information
fickludd committed Aug 21, 2018
1 parent 7607af3 commit db865a0
Show file tree
Hide file tree
Showing 123 changed files with 1,947 additions and 2,707 deletions.
Expand Up @@ -134,16 +134,16 @@ trait ExecutionEngineHelper {
case x => ValueUtils.of(x) case x => ValueUtils.of(x)
} }


def execute(q: String, params: (String, Any)*): InternalExecutionResult = def execute(q: String, params: (String, Any)*): RewindableExecutionResult =
RewindableExecutionResult(eengine.execute(q, asMapValue(params.toMap), graph.transactionalContext(query = q -> params.toMap))) RewindableExecutionResult(eengine.execute(q, asMapValue(params.toMap), graph.transactionalContext(query = q -> params.toMap)))


def execute(q: String, params: Map[String, Any]): InternalExecutionResult = def execute(q: String, params: Map[String, Any]): RewindableExecutionResult =
RewindableExecutionResult(eengine.execute(q, asMapValue(params), graph.transactionalContext(query = q -> params.toMap))) RewindableExecutionResult(eengine.execute(q, asMapValue(params), graph.transactionalContext(query = q -> params.toMap)))


def executeOfficial(q: String, params: (String, Any)*): Result = def executeOfficial(q: String, params: (String, Any)*): Result =
eengine.execute(q, asMapValue(params.toMap), graph.transactionalContext(query = q -> params.toMap)) eengine.execute(q, asMapValue(params.toMap), graph.transactionalContext(query = q -> params.toMap))


def profile(q: String, params: (String, Any)*): InternalExecutionResult = def profile(q: String, params: (String, Any)*): RewindableExecutionResult =
RewindableExecutionResult(eengine.profile(q, asMapValue(params.toMap), graph.transactionalContext(query = q -> params.toMap))) RewindableExecutionResult(eengine.profile(q, asMapValue(params.toMap), graph.transactionalContext(query = q -> params.toMap)))


def executeScalar[T](q: String, params: (String, Any)*): T = { def executeScalar[T](q: String, params: (String, Any)*): T = {
Expand All @@ -159,7 +159,7 @@ trait ExecutionEngineHelper {
val value: Any = m.head._2 val value: Any = m.head._2
value.asInstanceOf[T] value.asInstanceOf[T]
} }
case _ => throw new ScalarFailureException(s"expected to get a single row back") case x => throw new ScalarFailureException(s"expected to get a single row back, got: $x")
} }


protected class ScalarFailureException(msg: String) extends RuntimeException(msg) protected class ScalarFailureException(msg: String) extends RuntimeException(msg)
Expand Down
Expand Up @@ -33,13 +33,13 @@ class ExecutionResultTest extends ExecutionEngineFunSuite {
"where id(zero) = 0 AND id(one) = 1 AND id(two) = 2 AND id(three) = 3 AND id(four) = 4 AND id(five) = 5 AND id(six) = 6 AND id(seven) = 7 AND id(eight) = 8 AND id(nine) = 9 " + "where id(zero) = 0 AND id(one) = 1 AND id(two) = 2 AND id(three) = 3 AND id(four) = 4 AND id(five) = 5 AND id(six) = 6 AND id(seven) = 7 AND id(eight) = 8 AND id(nine) = 9 " +
"return zero, one, two, three, four, five, six, seven, eight, nine" "return zero, one, two, three, four, five, six, seven, eight, nine"


val result = execute(q) assert(execute(q).columns === columns)


assert( result.columns === columns )
val regex = "zero.*one.*two.*three.*four.*five.*six.*seven.*eight.*nine" val regex = "zero.*one.*two.*three.*four.*five.*six.*seven.*eight.*nine"
val pattern = Pattern.compile(regex) val pattern = Pattern.compile(regex)


assertTrue( "Columns did not appear in the expected order: \n" + result.dumpToString(), pattern.matcher(result.dumpToString()).find() ) val stringDump = graph.execute(q).resultAsString()
assertTrue( "Columns did not appear in the expected order: \n" + stringDump, pattern.matcher(stringDump).find() )
} }


test("correctLabelStatisticsForCreate") { test("correctLabelStatisticsForCreate") {
Expand Down
Expand Up @@ -60,7 +60,7 @@ class MergeConcurrencyIT extends ExecutionEngineFunSuite {
execute("match (a:Label) with a.id as id, count(*) as c where c > 1 return *") shouldBe empty execute("match (a:Label) with a.id as id, count(*) as c where c > 1 return *") shouldBe empty
execute("match (a)-[r1]->(b)<-[r2]-(a) where r1 <> r2 return *") shouldBe empty execute("match (a)-[r1]->(b)<-[r2]-(a) where r1 <> r2 return *") shouldBe empty


val details = "\n" + execute("match (a)-[r]->(b) return a.id, b.id, id(a), id(r), id(b)").dumpToString() val details = "\n" + graph.execute("match (a)-[r]->(b) return a.id, b.id, id(a), id(r), id(b)").resultAsString()


assert(execute(s"match p=(:Label {id:1})-[*..1000]->({id:$nodeCount}) return 1").size === 1, details) assert(execute(s"match p=(:Label {id:1})-[*..1000]->({id:$nodeCount}) return 1").size === 1, details)
} }
Expand Down

This file was deleted.

Expand Up @@ -160,9 +160,9 @@ class RootPlanAcceptanceTest extends ExecutionEngineFunSuite {
val runtimeString = runtime.map("runtime=" + _.name).getOrElse("") val runtimeString = runtime.map("runtime=" + _.name).getOrElse("")
s"CYPHER $version $plannerString $runtimeString" s"CYPHER $version $plannerString $runtimeString"
} }
val result = profile(s"$prepend $query") val result = executeOfficial(s"$prepend PROFILE $query")
result.dumpToString() result.resultAsString()
result.executionPlanDescription() result.getExecutionPlanDescription()
} }
} }
} }
Expand Up @@ -19,13 +19,13 @@
*/ */
package org.neo4j.cypher package org.neo4j.cypher


import org.neo4j.cypher.internal.runtime.InternalExecutionResult import org.neo4j.cypher.internal.RewindableExecutionResult
import org.opencypher.v9_0.util.test_helpers.{CypherFunSuite, CypherTestSupport} import org.opencypher.v9_0.util.test_helpers.{CypherFunSuite, CypherTestSupport}


trait TxCountsTrackingTestSupport extends CypherTestSupport { trait TxCountsTrackingTestSupport extends CypherTestSupport {
self: CypherFunSuite with GraphDatabaseTestSupport with ExecutionEngineTestSupport => self: CypherFunSuite with GraphDatabaseTestSupport with ExecutionEngineTestSupport =>


def executeAndTrackTxCounts(queryText: String, params: (String, Any)*): (InternalExecutionResult, TxCounts) = { def executeAndTrackTxCounts(queryText: String, params: (String, Any)*): (RewindableExecutionResult, TxCounts) = {
val (result, txCounts) = prepareAndTrackTxCounts(execute(queryText, params: _*)) val (result, txCounts) = prepareAndTrackTxCounts(execute(queryText, params: _*))
(result, txCounts) (result, txCounts)
} }
Expand Down
Expand Up @@ -25,6 +25,12 @@ case class SuboptimalIndexForConstainsQueryNotification(label: String, propertyK


case class SuboptimalIndexForEndsWithQueryNotification(label: String, propertyKeys: Seq[String]) extends InternalNotification case class SuboptimalIndexForEndsWithQueryNotification(label: String, propertyKeys: Seq[String]) extends InternalNotification


case object StartUnavailableFallback extends InternalNotification

case class CreateUniqueUnavailableFallback(position: InputPosition) extends InternalNotification

case object RulePlannerUnavailableFallbackNotification extends InternalNotification

case object PlannerUnsupportedNotification extends InternalNotification case object PlannerUnsupportedNotification extends InternalNotification


case object RuntimeUnsupportedNotification extends InternalNotification case object RuntimeUnsupportedNotification extends InternalNotification
Expand Down
Expand Up @@ -20,14 +20,14 @@
package org.neo4j.cypher.internal.compiler.v3_5 package org.neo4j.cypher.internal.compiler.v3_5


import org.neo4j.cypher.internal.compiler.v3_5.phases._ import org.neo4j.cypher.internal.compiler.v3_5.phases._
import org.opencypher.v9_0.ast._
import org.opencypher.v9_0.frontend.phases.{BaseState, Condition, Phase}
import org.opencypher.v9_0.ast.semantics.{SemanticCheckResult, SemanticState}
import org.opencypher.v9_0.util.attribution.SequentialIdGen
import org.neo4j.cypher.internal.v3_5.logical.plans import org.neo4j.cypher.internal.v3_5.logical.plans
import org.neo4j.cypher.internal.v3_5.logical.plans.{LogicalPlan, ResolvedCall} import org.neo4j.cypher.internal.v3_5.logical.plans.{LogicalPlan, ResolvedCall}
import org.opencypher.v9_0.ast._
import org.opencypher.v9_0.ast.semantics.{SemanticCheckResult, SemanticState}
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer.CompilationPhase import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer.CompilationPhase
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer.CompilationPhase.PIPE_BUILDING import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer.CompilationPhase.PIPE_BUILDING
import org.opencypher.v9_0.frontend.phases.{BaseState, Condition, Phase}
import org.opencypher.v9_0.util.attribution.SequentialIdGen


/** /**
* This planner takes on queries that requires no planning such as procedures and schema commands * This planner takes on queries that requires no planning such as procedures and schema commands
Expand Down
Expand Up @@ -23,26 +23,27 @@ import org.neo4j.cypher.CypherException
import org.neo4j.kernel.impl.query.TransactionalContext import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.values.virtual.MapValue import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer
import org.opencypher.v9_0.util.InternalNotification


/** /**
* Cypher compiler, which compiles pre-parsed queries into executable queries. * Cypher compiler, which compiles pre-parsed queries into executable queries.
*/ */
trait Compiler { trait Compiler {


/** /**
* Compile pre-parsed query into executable query. * Compile [[PreParsedQuery]] into [[ExecutableQuery]].
* *
* @param preParsedQuery pre-parsed query to convert * @param preParsedQuery pre-parsed query to convert
* @param tracer compilation tracer to which events of the compilation process are reported * @param tracer compilation tracer to which events of the compilation process are reported
* @param preParsingNotifications notifications from pre-parsing * @param preParsingNotifications notifications from pre-parsing
* @param transactionalContext transactional context to use during compilation (in logical and physical planning) * @param transactionalContext transactional context to use during compilation (in logical and physical planning)
* @throws CypherException public cypher exceptions on compilation problems * @throws CypherException public cypher exceptions on compilation problems
* @return a compiled and executable query * @return a compiled and executable query
*/ */
@throws[org.neo4j.cypher.CypherException] @throws[org.neo4j.cypher.CypherException]
def compile(preParsedQuery: PreParsedQuery, def compile(preParsedQuery: PreParsedQuery,
tracer: CompilationPhaseTracer, tracer: CompilationPhaseTracer,
preParsingNotifications: Set[org.neo4j.graphdb.Notification], preParsingNotifications: Set[InternalNotification],
transactionalContext: TransactionalContext, transactionalContext: TransactionalContext,
params: MapValue params: MapValue
): ExecutableQuery ): ExecutableQuery
Expand Down
Expand Up @@ -21,17 +21,17 @@ package org.neo4j.cypher.internal


import java.time.Clock import java.time.Clock


import org.neo4j.cypher.internal.compiler.v3_5.StatsDivergenceCalculator import org.neo4j.cypher.internal.compiler.v3_5.{StatsDivergenceCalculator, _}
import org.neo4j.cypher.{InvalidArgumentException, _} import org.neo4j.cypher.{InvalidArgumentException, _}
import org.neo4j.graphdb.impl.notification.NotificationCode._ import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.graphdb.impl.notification.NotificationDetail.Factory.message
import org.neo4j.kernel.GraphDatabaseQueryService import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.query.TransactionalContext import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors} import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.values.virtual.MapValue
import org.neo4j.logging.LogProvider import org.neo4j.logging.LogProvider
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.util.{InputPosition, SyntaxException => InternalSyntaxException} import org.opencypher.v9_0.frontend.phases.{CompilationPhaseTracer, RecordingNotificationLogger}
import org.opencypher.v9_0.util.{DeprecatedStartNotification, SyntaxException => InternalSyntaxException}


object MasterCompiler { object MasterCompiler {
val DEFAULT_QUERY_CACHE_SIZE: Int = 128 val DEFAULT_QUERY_CACHE_SIZE: Int = 128
Expand All @@ -53,6 +53,8 @@ class MasterCompiler(graph: GraphDatabaseQueryService,
logProvider: LogProvider, logProvider: LogProvider,
compilerLibrary: CompilerLibrary) { compilerLibrary: CompilerLibrary) {


import org.neo4j.cypher.internal.MasterCompiler._

/** /**
* Clear all compiler caches. * Clear all compiler caches.
* *
Expand All @@ -76,7 +78,8 @@ class MasterCompiler(graph: GraphDatabaseQueryService,
params: MapValue params: MapValue
): ExecutableQuery = { ): ExecutableQuery = {


var notifications = Set.newBuilder[org.neo4j.graphdb.Notification] val logger = new RecordingNotificationLogger(Some(preParsedQuery.offset))

val supportedRuntimes3_1 = Seq(CypherRuntimeOption.interpreted, CypherRuntimeOption.default) val supportedRuntimes3_1 = Seq(CypherRuntimeOption.interpreted, CypherRuntimeOption.default)
val inputPosition = preParsedQuery.offset val inputPosition = preParsedQuery.offset


Expand All @@ -85,7 +88,7 @@ class MasterCompiler(graph: GraphDatabaseQueryService,
if (config.useErrorsOverWarnings) { if (config.useErrorsOverWarnings) {
throw new InvalidArgumentException("The given query is not currently supported in the selected runtime") throw new InvalidArgumentException("The given query is not currently supported in the selected runtime")
} else { } else {
notifications += runtimeUnsupportedNotification(ex, inputPosition) logger.log(RuntimeUnsupportedNotification)
} }
} }
} }
Expand All @@ -99,7 +102,7 @@ class MasterCompiler(graph: GraphDatabaseQueryService,
def innerCompile(preParsedQuery: PreParsedQuery, params: MapValue): ExecutableQuery = { def innerCompile(preParsedQuery: PreParsedQuery, params: MapValue): ExecutableQuery = {


if ((preParsedQuery.version == CypherVersion.v3_4 || preParsedQuery.version == CypherVersion.v3_5) && preParsedQuery.planner == CypherPlannerOption.rule) { if ((preParsedQuery.version == CypherVersion.v3_4 || preParsedQuery.version == CypherVersion.v3_5) && preParsedQuery.planner == CypherPlannerOption.rule) {
notifications += rulePlannerUnavailableFallbackNotification(preParsedQuery.offset) logger.log(RulePlannerUnavailableFallbackNotification)
innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1), params) innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1), params)


} else if (preParsedQuery.version == CypherVersion.v3_5) { } else if (preParsedQuery.version == CypherVersion.v3_5) {
Expand All @@ -109,18 +112,18 @@ class MasterCompiler(graph: GraphDatabaseQueryService,
preParsedQuery.updateStrategy) preParsedQuery.updateStrategy)


try { try {
compiler3_5.compile(preParsedQuery, tracer, notifications.result(), transactionalContext, params) compiler3_5.compile(preParsedQuery, tracer, logger.notifications, transactionalContext, params)
} catch { } catch {
case ex: SyntaxException if ex.getMessage.startsWith("CREATE UNIQUE") => case ex: SyntaxException if ex.getMessage.startsWith("CREATE UNIQUE") =>
val ex3_5 = ex.getCause.asInstanceOf[InternalSyntaxException] val ex3_5 = ex.getCause.asInstanceOf[InternalSyntaxException]
notifications += createUniqueNotification(ex3_5, inputPosition) logger.log(CreateUniqueUnavailableFallback(ex3_5.pos.get))
assertSupportedRuntime(ex3_5, preParsedQuery.runtime) assertSupportedRuntime(ex3_5, preParsedQuery.runtime)
innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1, runtime = CypherRuntimeOption.interpreted), params) innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1, runtime = CypherRuntimeOption.interpreted), params)


case ex: SyntaxException if ex.getMessage.startsWith("START is deprecated") => case ex: SyntaxException if ex.getMessage.startsWith("START is deprecated") =>
val ex3_5 = ex.getCause.asInstanceOf[InternalSyntaxException] val ex3_5 = ex.getCause.asInstanceOf[InternalSyntaxException]
notifications += createStartUnavailableNotification(ex3_5, inputPosition) logger.log(StartUnavailableFallback)
notifications += createStartDeprecatedNotification(ex3_5, inputPosition) logger.log(DeprecatedStartNotification(inputPosition, ex.getMessage))
assertSupportedRuntime(ex3_5, preParsedQuery.runtime) assertSupportedRuntime(ex3_5, preParsedQuery.runtime)
innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1, runtime = CypherRuntimeOption.interpreted), params) innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1, runtime = CypherRuntimeOption.interpreted), params)
} }
Expand All @@ -132,37 +135,41 @@ class MasterCompiler(graph: GraphDatabaseQueryService,
preParsedQuery.runtime, preParsedQuery.runtime,
preParsedQuery.updateStrategy) preParsedQuery.updateStrategy)


compiler.compile(preParsedQuery, tracer, notifications.result(), transactionalContext, params) compiler.compile(preParsedQuery, tracer, logger.notifications, transactionalContext, params)
} }
} }


// Do the compilation // Do the compilation
innerCompile(preParsedQuery, params) innerCompile(preParsedQuery, params)
} }


private def createStartUnavailableNotification(ex: InternalSyntaxException, inputPosition: InputPosition) = { private def getStatisticsDivergenceCalculator: StatsDivergenceCalculator = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition)) val divergenceThreshold = getSetting(graph,
START_UNAVAILABLE_FALLBACK.notification(pos) config => config.get(GraphDatabaseSettings.query_statistics_divergence_threshold).doubleValue(),
DEFAULT_STATISTICS_DIVERGENCE_THRESHOLD)
val targetThreshold = getSetting(graph,
config => config.get(GraphDatabaseSettings.query_statistics_divergence_target).doubleValue(),
DEFAULT_STATISTICS_DIVERGENCE_TARGET)
val minReplanTime = getSetting(graph,
config => config.get(GraphDatabaseSettings.cypher_min_replan_interval).toMillis.longValue(),
DEFAULT_QUERY_PLAN_TTL)
val targetReplanTime = getSetting(graph,
config => config.get(GraphDatabaseSettings.cypher_replan_interval_target).toMillis.longValue(),
DEFAULT_QUERY_PLAN_TARGET)
val divergenceAlgorithm = getSetting(graph,
config => config.get(GraphDatabaseSettings.cypher_replan_algorithm),
DEFAULT_DIVERGENCE_ALGORITHM)
StatsDivergenceCalculator.divergenceCalculatorFor(divergenceAlgorithm, divergenceThreshold, targetThreshold, minReplanTime, targetReplanTime)
} }


private def createStartDeprecatedNotification(ex: InternalSyntaxException, inputPosition: InputPosition) = { private def getNonIndexedLabelWarningThreshold: Long = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition)) val setting: (Config) => Long = config => config.get(GraphDatabaseSettings.query_non_indexed_label_warning_threshold).longValue()
START_DEPRECATED.notification(pos, message("START", ex.getMessage)) getSetting(graph, setting, DEFAULT_NON_INDEXED_LABEL_WARNING_THRESHOLD)
} }


private def runtimeUnsupportedNotification(ex: InternalSyntaxException, inputPosition: InputPosition) = { private def getSetting[A](gds: GraphDatabaseQueryService, configLookup: Config => A, default: A): A = gds match {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition)) // TODO: Cypher should not be pulling out components from casted interfaces, it should ask for Config as a dep
RUNTIME_UNSUPPORTED.notification(pos) case gdbApi:GraphDatabaseQueryService => configLookup(gdbApi.getDependencyResolver.resolveDependency(classOf[Config]))
case _ => default
} }

private def createUniqueNotification(ex: InternalSyntaxException, inputPosition: InputPosition) = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition))
CREATE_UNIQUE_UNAVAILABLE_FALLBACK.notification(pos)
}

private def rulePlannerUnavailableFallbackNotification(offset: InputPosition) =
RULE_PLANNER_UNAVAILABLE_FALLBACK.notification(convertInputPosition(offset))

private def convertInputPosition(offset: InputPosition) =
new org.neo4j.graphdb.InputPosition(offset.offset, offset.line, offset.column)
} }
Expand Up @@ -76,7 +76,7 @@ abstract class BasePlanner[STATEMENT <: AnyRef, PARSED_STATE <: AnyRef](
} }


protected def createReusabilityState(logicalPlanState: LogicalPlanState, protected def createReusabilityState(logicalPlanState: LogicalPlanState,
planContext: PlanContext): ReusabilityState = { planContext: PlanContext): ReusabilityState = {


if (ProcedureCallOrSchemaCommandRuntime if (ProcedureCallOrSchemaCommandRuntime
.logicalToExecutable .logicalToExecutable
Expand Down

0 comments on commit db865a0

Please sign in to comment.