Skip to content

Commit

Permalink
Merge pull request #5968 from pontusmelke/3.0-eager-switch
Browse files Browse the repository at this point in the history
switch for running queries eagerly
  • Loading branch information
systay committed Dec 1, 2015
2 parents d1b13d2 + 9a8d278 commit ee5454f
Show file tree
Hide file tree
Showing 34 changed files with 246 additions and 189 deletions.
Expand Up @@ -63,31 +63,31 @@ class PreParsingAcceptanceTest extends ExecutionEngineFunSuite {
}

test("specifying cost planner should provide IDP using old syntax") {
val query = "PROFILE CYPHER planner cost RETURN 1"
val query = "PROFILE CYPHER planner=cost RETURN 1"

eengine.execute(query) should havePlanner(IDPPlannerName)
}

test("specifying idp planner should provide IDP using old syntax") {
val query = "PROFILE CYPHER planner idp RETURN 1"
val query = "PROFILE CYPHER planner=idp RETURN 1"

eengine.execute(query) should havePlanner(IDPPlannerName)
}

test("specifying greedy planner should provide greedy using old syntax") {
val query = "PROFILE CYPHER planner greedy RETURN 1"
val query = "PROFILE CYPHER planner=greedy RETURN 1"

eengine.execute(query) should havePlanner(GreedyPlannerName)
}

test("specifying dp planner should provide DP using old syntax") {
val query = "PROFILE CYPHER planner dp RETURN 1"
val query = "PROFILE CYPHER planner=dp RETURN 1"

eengine.execute(query) should havePlanner(DPPlannerName)
}

test("specifying rule planner should provide RULE using old syntax") {
val query = "PROFILE CYPHER planner rule RETURN 1"
val query = "PROFILE CYPHER planner=rule RETURN 1"

eengine.execute(query) should havePlanner(RulePlannerName)
}
Expand Down
Expand Up @@ -72,7 +72,8 @@ object CypherCompilerFactory {
logger: InfoLogger,
rewriterSequencer: (String) => RewriterStepSequencer,
plannerName: Option[CostBasedPlannerName],
runtimeName: Option[RuntimeName]): CypherCompiler = {
runtimeName: Option[RuntimeName],
updateStrategy: Option[UpdateStrategy]): CypherCompiler = {
val parser = new CypherParser
val checker = new SemanticChecker
val rewriter = new ASTRewriter(rewriterSequencer)
Expand All @@ -94,7 +95,8 @@ object CypherCompilerFactory {
plannerName = plannerName,
runtimeBuilder = runtimeBuilder,
semanticChecker = checker,
useErrorsOverWarnings = config.useErrorsOverWarnings
useErrorsOverWarnings = config.useErrorsOverWarnings,
updateStrategy = updateStrategy
)
val rulePlanProducer = new LegacyExecutablePlanBuilder(monitors, rewriterSequencer)

Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.compiler.v3_0


sealed abstract class UpdateStrategy {
def name: String
def toTextOutput: String = name
def alwaysEager: Boolean
}

case object eagerUpdateStrategy extends UpdateStrategy {
override val name = "EAGER"

override def alwaysEager = true
}

case object defaultUpdateStrategy extends UpdateStrategy {
override val name = "DEFAULT"

override def alwaysEager = false
}

object UpdateStrategy {

def apply(name: String): UpdateStrategy = name.toUpperCase match {
case eagerUpdateStrategy.name => eagerUpdateStrategy
case defaultUpdateStrategy.name => defaultUpdateStrategy

case n => throw new IllegalArgumentException(
s"$n is not a valid update strategy, valid options are ${defaultUpdateStrategy.name} and ${eagerUpdateStrategy.name}")
}
}

This file was deleted.

Expand Up @@ -46,6 +46,7 @@ case class CostBasedExecutablePlanBuilder(monitors: Monitors,
semanticChecker: SemanticChecker,
plannerName: CostBasedPlannerName,
runtimeBuilder: RuntimeBuilder,
updateStrategy: UpdateStrategy,
useErrorsOverWarnings: Boolean)
extends ExecutablePlanBuilder {

Expand Down Expand Up @@ -83,7 +84,10 @@ case class CostBasedExecutablePlanBuilder(monitors: Monitors,
val unionQuery = toUnionQuery(ast, semanticTable)
val metrics = metricsFactory.newMetrics(planContext.statistics)
val logicalPlanProducer = LogicalPlanProducer(metrics.cardinality)
val context: LogicalPlanningContext = LogicalPlanningContext(planContext, logicalPlanProducer, metrics, semanticTable, queryGraphSolver, notificationLogger = notificationLogger, useErrorsOverWarnings = useErrorsOverWarnings, config = QueryPlannerConfiguration.default)

val context = LogicalPlanningContext(planContext, logicalPlanProducer, metrics, semanticTable,
queryGraphSolver, notificationLogger = notificationLogger, useErrorsOverWarnings = useErrorsOverWarnings,
config = QueryPlannerConfiguration.default.withUpdateStrategy(updateStrategy))

val plan = queryPlanner.plan(unionQuery)(context)

Expand Down Expand Up @@ -115,7 +119,6 @@ object CostBasedExecutablePlanBuilder {
CNFNormalizer()(monitor)
)

// val namespacedSemanticTable = namespacer.tableRewriter(semanticTable)
val state = semanticChecker.check(namespacedStatement.toString, namespacedStatement, mkException = (msg, pos) => throw new InternalException(s"Unexpected error during late semantic checking: $msg"))
val table = semanticTable.copy(types = state.typeTable, recordedScopes = state.recordedScopes)

Expand Down
Expand Up @@ -35,7 +35,8 @@ object CostBasedPipeBuilderFactory {
tokenResolver: SimpleTokenResolver = new SimpleTokenResolver(),
plannerName: Option[CostBasedPlannerName],
runtimeBuilder: RuntimeBuilder,
useErrorsOverWarnings: Boolean
useErrorsOverWarnings: Boolean,
updateStrategy: Option[UpdateStrategy]
) = {

def createQueryGraphSolver(n: CostBasedPlannerName): QueryGraphSolver = n match {
Expand All @@ -52,6 +53,10 @@ object CostBasedPipeBuilderFactory {
}

val actualPlannerName = plannerName.getOrElse(CostBasedPlannerName.default)
CostBasedExecutablePlanBuilder(monitors, metricsFactory, tokenResolver, queryPlanner, createQueryGraphSolver(actualPlannerName), rewriterSequencer, semanticChecker, actualPlannerName, runtimeBuilder, useErrorsOverWarnings)
val actualUpdateStrategy = updateStrategy.getOrElse(defaultUpdateStrategy)
CostBasedExecutablePlanBuilder(monitors, metricsFactory, tokenResolver, queryPlanner,
createQueryGraphSolver(actualPlannerName), rewriterSequencer, semanticChecker, actualPlannerName, runtimeBuilder,
actualUpdateStrategy,
useErrorsOverWarnings)
}
}
Expand Up @@ -93,6 +93,11 @@ case class UpdateGraph(mutatingPatterns: Seq[MutatingPattern] = Seq.empty) {
def updatesNodes: Boolean = createNodePatterns.nonEmpty || removeLabelPatterns.nonEmpty ||
mergeNodePatterns.nonEmpty || setLabelPatterns.nonEmpty || setNodePropertyPatterns.nonEmpty

/*
* Does this UpdateGraph contains merges
*/
def containsMerge: Boolean = mergeNodePatterns.nonEmpty

/*
* Checks if there is overlap between what's being read in the query graph
* and what is being written here
Expand Down
Expand Up @@ -367,12 +367,7 @@ case class ActualPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe)
case OuterHashJoin(nodes, l, r) =>
NodeOuterHashJoinPipe(nodes.map(_.name), lhs, rhs, (r.availableSymbols -- l.availableSymbols).map(_.name))()

case Apply(_, _) =>
lhs match {
case EagerPipe(inner) =>
EagerApplyPipe(inner, rhs)()
case _ => ApplyPipe(lhs, rhs)()
}
case Apply(_, _) => ApplyPipe(lhs, rhs)()

case AssertSameNode(node, _, _) =>
AssertSameNodePipe(lhs, rhs, node.name)()
Expand Down
Expand Up @@ -36,14 +36,16 @@ case class PlanSingleQuery(planPart: (PlannerQuery, LogicalPlanningContext, Opti

override def apply(in: PlannerQuery)(implicit context: LogicalPlanningContext): LogicalPlan = {
val partPlan = countStorePlanner(in).getOrElse(planPart(in, context, None))
//always use eager if configured to do so
val alwaysEager = context.config.updateStrategy.alwaysEager

val planWithEffect =
if (conflicts(partPlan, in))
if (alwaysEager || conflicts(partPlan, in))
context.logicalPlanProducer.planEager(partPlan)
else partPlan
val planWithUpdates = planUpdates(in, planWithEffect)(context)
val planWithUpdatesAndEffects =
if (in.updateGraph.mergeNodeDeleteOverlap)
if (alwaysEager || in.updateGraph.mergeNodeDeleteOverlap)
context.logicalPlanProducer.planEager(planWithUpdates)
else planWithUpdates

Expand Down
Expand Up @@ -69,10 +69,11 @@ case class PlanWithTail(expressionRewriterFactory: (LogicalPlanningContext => Re
case Some(query) =>
val lhsContext = context.recurse(lhs)
val partPlan = planPart(query, lhsContext, Some(context.logicalPlanProducer.planQueryArgumentRow(query.queryGraph)))

///use eager if configured to do so
val alwaysEager = context.config.updateStrategy.alwaysEager
//If reads interfere with writes, make it a RepeatableRead
val planWithEffects =
if (query.updateGraph overlaps query.queryGraph)
if (alwaysEager || (query.updateGraph overlaps query.queryGraph))
context.logicalPlanProducer.planRepeatableRead(partPlan)
else partPlan

Expand All @@ -81,7 +82,8 @@ case class PlanWithTail(expressionRewriterFactory: (LogicalPlanningContext => Re
//If previous update interferes with any of the reads here or in tail, make it an EagerApply
val applyPlan = {
val lastPlannerQuery = lhs.solved.last
val newLhs = if (!lastPlannerQuery.writeOnly && query.allQueryGraphs.exists(lastPlannerQuery.updateGraph.overlaps))
val newLhs = if (alwaysEager ||
(!lastPlannerQuery.writeOnly && query.allQueryGraphs.exists(lastPlannerQuery.updateGraph.overlaps)))
context.logicalPlanProducer.planEager(lhs)
else lhs

Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.cypher.internal.compiler.v3_0.planner.logical

import org.neo4j.cypher.internal.compiler.v3_0.{defaultUpdateStrategy, UpdateStrategy}
import org.neo4j.cypher.internal.compiler.v3_0.planner.QueryGraph
import org.neo4j.cypher.internal.compiler.v3_0.planner.logical.greedy.projectEndpoints
import org.neo4j.cypher.internal.compiler.v3_0.planner.logical.plans.LogicalPlan
Expand Down Expand Up @@ -62,15 +63,17 @@ object QueryPlannerConfiguration {

// Legacy indices
legacyHintLeafPlanner
)
),
updateStrategy = defaultUpdateStrategy
)
}

case class QueryPlannerConfiguration(leafPlanners: LeafPlannerIterable,
applySelections: PlanTransformer[QueryGraph],
projectAllEndpoints: PlanTransformer[QueryGraph],
optionalSolvers: Seq[OptionalSolver],
pickBestCandidate: LogicalPlanningFunction0[CandidateSelector]) {
pickBestCandidate: LogicalPlanningFunction0[CandidateSelector],
updateStrategy: UpdateStrategy) {

def toKit()(implicit context: LogicalPlanningContext): QueryPlannerKit =
QueryPlannerKit(
Expand All @@ -80,6 +83,8 @@ case class QueryPlannerConfiguration(leafPlanners: LeafPlannerIterable,
)

def withLeafPlanners(leafPlanners: LeafPlannerIterable) = copy(leafPlanners = leafPlanners)

def withUpdateStrategy(updateStrategy: UpdateStrategy) = copy(updateStrategy = updateStrategy)
}

case class QueryPlannerKit(select: (LogicalPlan, QueryGraph) => LogicalPlan,
Expand Down
Expand Up @@ -169,7 +169,8 @@ trait LogicalPlanningTestSupport extends CypherTestSupport with AstConstructionT
plannerName = None,
runtimeBuilder = InterpretedRuntimeBuilder(InterpretedPlanBuilder(Clock.SYSTEM_CLOCK, monitors)),
semanticChecker = semanticChecker,
useErrorsOverWarnings = false)
useErrorsOverWarnings = false,
updateStrategy = None)
}

def produceLogicalPlan(queryText: String)(implicit planner: CostBasedExecutablePlanBuilder, planContext: PlanContext): LogicalPlan = {
Expand Down
Expand Up @@ -157,14 +157,16 @@ trait LogicalPlanningTestSupport2 extends CypherTestSupport with AstConstruction
}

def planFor(queryString: String): SemanticPlan = {

val parsedStatement = parser.parse(queryString)
val mkException = new SyntaxExceptionCreator(queryString, Some(pos))
val cleanedStatement: Statement = parsedStatement.endoRewrite(inSequence(normalizeReturnClauses(mkException), normalizeWithClauses(mkException)))
val semanticState = semanticChecker.check(queryString, cleanedStatement, mkException)
val (rewrittenStatement, _, postConditions) = astRewriter.rewrite(queryString, cleanedStatement, semanticState)
val postRewriteSemanticState = semanticChecker.check(queryString, rewrittenStatement, mkException)
val semanticTable = SemanticTable(types = postRewriteSemanticState.typeTable)
CostBasedExecutablePlanBuilder.rewriteStatement(rewrittenStatement, postRewriteSemanticState.scopeTree, semanticTable, rewriterSequencer, semanticChecker, postConditions, mock[AstRewritingMonitor]) match {
CostBasedExecutablePlanBuilder.rewriteStatement(rewrittenStatement, postRewriteSemanticState.scopeTree,
semanticTable, rewriterSequencer, semanticChecker, postConditions, mock[AstRewritingMonitor]) match {
case (ast: Query, newTable) =>
tokenResolver.resolve(ast)(newTable, planContext)
val unionQuery = toUnionQuery(ast, semanticTable)
Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher


sealed abstract class CypherUpdateStrategy(plannerName: String) extends CypherOption(plannerName)

case object CypherUpdateStrategy extends CypherOptionCompanion[CypherUpdateStrategy] {

case object default extends CypherUpdateStrategy("default")
case object eager extends CypherUpdateStrategy("eager")

val all: Set[CypherUpdateStrategy] = Set(default, eager)
}

0 comments on commit ee5454f

Please sign in to comment.