Skip to content

Commit

Permalink
Remove all usages of Id and replace them with LogicalPlanId
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Sep 15, 2017
1 parent 87fd0cc commit 119120e
Show file tree
Hide file tree
Showing 134 changed files with 537 additions and 640 deletions.
Expand Up @@ -57,6 +57,7 @@ trait LogicalPlanRewriter extends Phase[CompilerContext, LogicalPlanState, Logic

override def process(from: LogicalPlanState, context: CompilerContext): LogicalPlanState = {
val rewritten = from.logicalPlan.endoRewrite(instance(context))
rewritten.assignIds() // This should be the only place where ids are assigned.
from.copy(maybeLogicalPlan = Some(rewritten))
}
}
Expand Up @@ -19,10 +19,12 @@
*/
package org.neo4j.cypher.internal.compiler.v3_3.planner.logical.plans

import org.neo4j.cypher.internal.compiler.v3_3.ast.NestedPlanExpression
import org.neo4j.cypher.internal.compiler.v3_3.planner.LogicalPlanningTestSupport2
import org.neo4j.cypher.internal.frontend.v3_3.test_helpers.CypherFunSuite
import org.neo4j.cypher.internal.frontend.v3_3.{CypherException, Rewriter, topDown}
import org.neo4j.cypher.internal.v3_3.logical.plans.{Apply, Projection, SingleRow}
import org.neo4j.cypher.internal.ir.v3_3.IdName
import org.neo4j.cypher.internal.v3_3.logical.plans._

class LogicalPlanAssignedIdTest extends CypherFunSuite with LogicalPlanningTestSupport2 {
test("assignedId survives rewriting") {
Expand Down Expand Up @@ -63,13 +65,13 @@ class LogicalPlanAssignedIdTest extends CypherFunSuite with LogicalPlanningTestS

applyAll.assignIds()

applyAll.assignedId.underlying should equal(6)
applyA.assignedId.underlying should equal(5)
sr1A.assignedId.underlying should equal(4)
applyAll.assignedId.underlying should equal(0)
applyA.assignedId.underlying should equal(1)
sr1A.assignedId.underlying should equal(2)
sr2A.assignedId.underlying should equal(3)
applyB.assignedId.underlying should equal(2)
sr1B.assignedId.underlying should equal(1)
sr2B.assignedId.underlying should equal(0)
applyB.assignedId.underlying should equal(4)
sr1B.assignedId.underlying should equal(5)
sr2B.assignedId.underlying should equal(6)
}

test("cant assign ids twice") {
Expand All @@ -79,4 +81,14 @@ class LogicalPlanAssignedIdTest extends CypherFunSuite with LogicalPlanningTestS
intercept[CypherException](pr.assignIds())
}

test("can assign inside expressions as well") {
val singleRow = SingleRow()(solved)
val inner = AllNodesScan(IdName("x"), Set.empty)(solved)
val filter = Selection(Seq(NestedPlanExpression(inner, literalInt(42))(pos)), singleRow)(solved)

filter.assignIds()

val x = inner.assignedId // should not fail
}

}
Expand Up @@ -45,34 +45,29 @@ abstract class LogicalPlan
def solved: PlannerQuery with CardinalityEstimation
def availableSymbols: Set[IdName]

/*
A id for the logical plan operator, unique inside of the given query tree. These identifiers will be
copied to a rewritten version of the logical plan, as long as there is a one-to-one mapping between
rewritten plans. In other words - once ids have been assigned, plan rewriting should not collapse multiple
operators into one, or split a single one into multiple new ones.
*/
def assignedId: LogicalPlanId = _id.getOrElse(throw new InternalException("Plan has not had an id assigned yet"))

def assignIds(): Unit = {
if(_id.nonEmpty)
if (_id.nonEmpty)
throw new InternalException("Id has already been assigned")

val builder = new IdAssigner
builder.assignIds()
var count = 0
val plans = this.findByAllClass[LogicalPlan]
plans.foreach { lp =>
lp._id = Some(new LogicalPlanId(count))
count = count + 1
}
assignedId
}

private var _id: Option[LogicalPlanId] = None

private class IdAssigner extends TreeBuilder[Int] {
def assignIds() = create(self)

private var count = 0

override protected def build(plan: LogicalPlan) = {
plan._id = Some(new LogicalPlanId(count))
count = count + 1
count
}

override protected def build(plan: LogicalPlan, source: Int) = build(plan)

override protected def build(plan: LogicalPlan, lhs: Int, rhs: Int) = build(plan)
}

override def rememberMe(old: AnyRef): Unit = _id = old.asInstanceOf[LogicalPlan]._id

def leaves: Seq[LogicalPlan] = this.treeFold(Seq.empty[LogicalPlan]) {
Expand Down Expand Up @@ -207,6 +202,16 @@ final case class SchemaIndexScanUsage(identifier: String, labelId : Int, label:
final case class ExplicitNodeIndexUsage(identifier: String, index: String) extends IndexUsage
final case class ExplicitRelationshipIndexUsage(identifier: String, index: String) extends IndexUsage

object LogicalPlanId {
// This is probably a safe way of assigning ids, but should only be used in tests
private var counter = 0
def DEFAULT: LogicalPlanId = {
val id = new LogicalPlanId(counter)
counter += 1
id
}
}

class LogicalPlanId(val underlying: Int) extends AnyVal {
def ++ : LogicalPlanId = new LogicalPlanId(underlying + 1)
}
Expand Up @@ -24,7 +24,6 @@ import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.convert.{Co
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.executionplan._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.phases.CompilationState
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes.Pipe
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.LogicalPlanIdentificationBuilder
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.profiler.Profiler
import org.neo4j.cypher.internal.compiler.v3_3.CypherCompilerConfiguration
import org.neo4j.cypher.internal.compiler.v3_3.phases._
Expand All @@ -46,15 +45,14 @@ object BuildInterpretedExecutionPlan extends Phase[CommunityRuntimeContext, Logi

override def process(from: LogicalPlanState, context: CommunityRuntimeContext): CompilationState = {
val logicalPlan = from.logicalPlan
val idMap = LogicalPlanIdentificationBuilder(logicalPlan)
val converters = new ExpressionConverters(CommunityExpressionConverter)
val executionPlanBuilder = new PipeExecutionPlanBuilder(context.clock, context.monitors,
expressionConverters = converters, pipeBuilderFactory = CommunityPipeBuilderFactory)
val pipeBuildContext = PipeExecutionBuilderContext(context.metrics.cardinality, from.semanticTable(), from.plannerName)
val pipeInfo = executionPlanBuilder.build(from.periodicCommit, logicalPlan, idMap)(pipeBuildContext, context.planContext)
val pipeInfo = executionPlanBuilder.build(from.periodicCommit, logicalPlan)(pipeBuildContext, context.planContext)
val PipeInfo(pipe, updating, periodicCommitInfo, fp, planner) = pipeInfo
val columns = from.statement().returnColumns
val resultBuilderFactory = DefaultExecutionResultBuilderFactory(pipeInfo, columns, logicalPlan, idMap)
val resultBuilderFactory = DefaultExecutionResultBuilderFactory(pipeInfo, columns, logicalPlan)
val func = getExecutionPlanFunction(periodicCommitInfo, from.queryText, updating, resultBuilderFactory,
context.notificationLogger, InterpretedRuntimeName)
val execPlan = new ExecutionPlan {
Expand Down
Expand Up @@ -21,12 +21,11 @@ package org.neo4j.cypher.internal.compatibility.v3_3.runtime

import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.convert.ExpressionConverters
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.convert.PatternConverters._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.expressions.{AggregationExpression, Literal, Expression => CommandExpression}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.expressions.{AggregationExpression, Literal}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.predicates.{Predicate, True}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.executionplan._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.executionplan.builders.prepare.KeyTokenResolver
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.Id
import org.neo4j.cypher.internal.compiler.v3_3.spi.PlanContext
import org.neo4j.cypher.internal.frontend.v3_3.ast._
import org.neo4j.cypher.internal.frontend.v3_3.helpers.Eagerly
Expand All @@ -43,7 +42,7 @@ import org.neo4j.values.virtual.{EdgeValue, NodeValue}
* When adding new Pipes and LogicalPlans, this is where you should be looking.
*/
case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe, readOnly: Boolean,
idMap: Map[LogicalPlan, Id], expressionConverters: ExpressionConverters,
expressionConverters: ExpressionConverters,
rewriteAstExpression: (frontEndAst.Expression) => frontEndAst.Expression)
(implicit context: PipeExecutionBuilderContext, planContext: PlanContext) extends PipeBuilder {

Expand All @@ -55,7 +54,7 @@ case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe


def build(plan: LogicalPlan): Pipe = {
val id = idMap.getOrElse(plan, new Id)
val id = plan.assignedId
plan match {
case SingleRow() =>
SingleRowPipe()(id)
Expand Down Expand Up @@ -104,7 +103,7 @@ case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe
}

def build(plan: LogicalPlan, source: Pipe): Pipe = {
val id = idMap.getOrElse(plan, new Id)
val id = plan.assignedId
plan match {
case Projection(_, expressions) =>
ProjectionPipe(source, Eagerly.immutableMapValues(expressions, buildExpression))(id = id)
Expand Down Expand Up @@ -337,7 +336,7 @@ case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe
}

def build(plan: LogicalPlan, lhs: Pipe, rhs: Pipe): Pipe = {
val id = idMap.getOrElse(plan, new Id)
val id = plan.assignedId
plan match {
case CartesianProduct(_, _) =>
CartesianProductPipe(lhs, rhs)(id = id)
Expand Down Expand Up @@ -403,7 +402,6 @@ case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe
}
}

private val resolver = new KeyTokenResolver
implicit val table: SemanticTable = context.semanticTable

private def buildPredicate(expr: frontEndAst.Expression)(implicit context: PipeExecutionBuilderContext, planContext: PlanContext): Predicate = {
Expand Down
Expand Up @@ -21,7 +21,6 @@ package org.neo4j.cypher.internal.compatibility.v3_3.runtime

import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.convert.ExpressionConverters
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes.{NestedPipeExpression, Pipe}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.Id
import org.neo4j.cypher.internal.compiler.v3_3.spi.PlanContext
import org.neo4j.cypher.internal.compiler.v3_3.{ast => compilerAst}
import org.neo4j.cypher.internal.frontend.v3_3.phases.Monitors
Expand All @@ -33,7 +32,6 @@ trait PipeBuilderFactory {
def apply(monitors: Monitors,
recurse: LogicalPlan => Pipe,
readOnly: Boolean,
idMap: Map[LogicalPlan, Id],
expressionConverters: ExpressionConverters)
(implicit context: PipeExecutionBuilderContext, planContext: PlanContext): PipeBuilder

Expand Down
Expand Up @@ -24,7 +24,6 @@ import java.time.Clock
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.commands.convert.ExpressionConverters
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.executionplan._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.Id
import org.neo4j.cypher.internal.compiler.v3_3.spi.{InstrumentedGraphStatistics, PlanContext}
import org.neo4j.cypher.internal.frontend.v3_3._
import org.neo4j.cypher.internal.frontend.v3_3.phases.Monitors
Expand All @@ -37,10 +36,10 @@ class PipeExecutionPlanBuilder(clock: Clock,
monitors: Monitors,
pipeBuilderFactory: PipeBuilderFactory,
expressionConverters: ExpressionConverters) {
def build(periodicCommit: Option[PeriodicCommit], plan: LogicalPlan, idMap: Map[LogicalPlan, Id])
def build(periodicCommit: Option[PeriodicCommit], plan: LogicalPlan)
(implicit context: PipeExecutionBuilderContext, planContext: PlanContext): PipeInfo = {

val topLevelPipe = buildPipe(plan, idMap)
val topLevelPipe = buildPipe(plan)

val fingerprint = planContext.statistics match {
case igs: InstrumentedGraphStatistics =>
Expand Down Expand Up @@ -73,12 +72,11 @@ class PipeExecutionPlanBuilder(clock: Clock,
Next we pop out 'a', and this time we are coming from the LHS, and we can now pop two pipes from the pipe stack to
build the pipe for 'a'. Thanks for reading this far - I didn't think we would make it!
*/
private def buildPipe(plan: LogicalPlan, idMap: Map[LogicalPlan, Id])(implicit context: PipeExecutionBuilderContext, planContext: PlanContext): Pipe = {
private def buildPipe(plan: LogicalPlan)(implicit context: PipeExecutionBuilderContext, planContext: PlanContext): Pipe = {
val pipeBuilder = pipeBuilderFactory(
monitors = monitors,
recurse = p => buildPipe(p, idMap),
recurse = p => buildPipe(p),
readOnly = plan.solved.all(_.queryGraph.readOnly),
idMap = idMap,
expressionConverters = expressionConverters)

val planStack = new mutable.Stack[LogicalPlan]()
Expand Down Expand Up @@ -149,10 +147,10 @@ class PipeExecutionPlanBuilder(clock: Clock,

object CommunityPipeBuilderFactory extends PipeBuilderFactory {
def apply(monitors: Monitors, recurse: LogicalPlan => Pipe,
readOnly: Boolean, idMap: Map[LogicalPlan, Id],
readOnly: Boolean,
expressionConverters: ExpressionConverters)
(implicit context: PipeExecutionBuilderContext, planContext: PlanContext): CommunityPipeBuilder = {
CommunityPipeBuilder(monitors, recurse, readOnly, idMap, expressionConverters, recursePipes(recurse, planContext))
CommunityPipeBuilder(monitors, recurse, readOnly, expressionConverters, recursePipes(recurse, planContext))
}

}
Expand Down
Expand Up @@ -24,7 +24,7 @@ import org.neo4j.cypher.internal.compatibility.v3_3.runtime._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.helpers.InternalWrapping._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.InternalPlanDescription.Arguments.{Runtime, RuntimeImpl}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.{Id, InternalPlanDescription, LogicalPlan2PlanDescription}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.{InternalPlanDescription, LogicalPlan2PlanDescription}
import org.neo4j.cypher.internal.frontend.v3_3.phases.InternalNotificationLogger
import org.neo4j.cypher.internal.frontend.v3_3.{CypherException, ProfilerStatisticsNotReadyException}
import org.neo4j.cypher.internal.spi.v3_3.{CSVResources, QueryContext}
Expand All @@ -35,8 +35,7 @@ import scala.collection.mutable

case class DefaultExecutionResultBuilderFactory(pipeInfo: PipeInfo,
columns: List[String],
logicalPlan: LogicalPlan,
idMap: Map[LogicalPlan, Id]) extends ExecutionResultBuilderFactory {
logicalPlan: LogicalPlan) extends ExecutionResultBuilderFactory {
def create(): ExecutionResultBuilder =
ExecutionWorkflowBuilder()

Expand Down Expand Up @@ -91,7 +90,7 @@ case class DefaultExecutionResultBuilderFactory(pipeInfo: PipeInfo,
runtimeName: RuntimeName): InternalExecutionResult = {
val queryType: InternalQueryType = getQueryType
val planDescription: InternalPlanDescription =
LogicalPlan2PlanDescription(logicalPlan, idMap, pipeInfo.plannerUsed)
LogicalPlan2PlanDescription(logicalPlan, pipeInfo.plannerUsed)
.addArgument(Runtime(runtimeName.toTextOutput))
.addArgument(RuntimeImpl(runtimeName.name))
if (planType == ExplainMode) {
Expand Down
Expand Up @@ -29,8 +29,9 @@ import org.neo4j.cypher.internal.compatibility.v3_3.runtime.executionplan.{Execu
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.helpers.Counter
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes.{ExternalCSVResource, QueryState}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.InternalPlanDescription.Arguments._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.{Argument, Id, NoChildren, PlanDescriptionImpl}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.{Argument, NoChildren, PlanDescriptionImpl}
import org.neo4j.cypher.internal.compiler.v3_3.ProcedurePlannerName
import org.neo4j.cypher.internal.v3_3.logical.plans.LogicalPlanId
import org.neo4j.cypher.internal.compiler.v3_3.spi.{GraphStatistics, PlanContext}
import org.neo4j.cypher.internal.frontend.v3_3.ast.Expression
import org.neo4j.cypher.internal.frontend.v3_3.notification.InternalNotification
Expand Down Expand Up @@ -115,13 +116,13 @@ case class ProcedureCallExecutionPlan(signature: ProcedureSignature,
}

private def createNormalPlan =
PlanDescriptionImpl(new Id, "ProcedureCall", NoChildren,
PlanDescriptionImpl(LogicalPlanId.DEFAULT, "ProcedureCall", NoChildren,
arguments,
resultSymbols.map(_._1).toSet
)

private def createProfilePlanGenerator(rowCounter: Counter) = () =>
PlanDescriptionImpl(new Id, "ProcedureCall", NoChildren,
PlanDescriptionImpl(LogicalPlanId.DEFAULT, "ProcedureCall", NoChildren,
Seq(createSignatureArgument, DbHits(1), Rows(rowCounter.counted)) ++ arguments,
resultSymbols.map(_._1).toSet
)
Expand Down
Expand Up @@ -24,8 +24,9 @@ import org.neo4j.cypher.internal.InternalExecutionResult
import org.neo4j.cypher.internal.compatibility.v3_3.runtime._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.executionplan.{ExecutionPlan, InternalQueryType, SCHEMA_WRITE}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.InternalPlanDescription.Arguments._
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.{Id, NoChildren, PlanDescriptionImpl}
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.{NoChildren, PlanDescriptionImpl}
import org.neo4j.cypher.internal.compiler.v3_3._
import org.neo4j.cypher.internal.v3_3.logical.plans.LogicalPlanId
import org.neo4j.cypher.internal.compiler.v3_3.spi.{GraphStatistics, PlanContext}
import org.neo4j.cypher.internal.frontend.v3_3.PlannerName
import org.neo4j.cypher.internal.frontend.v3_3.notification.InternalNotification
Expand Down Expand Up @@ -58,7 +59,7 @@ case class PureSideEffectExecutionPlan(name: String, queryType: InternalQueryTyp
}
}

private def description = PlanDescriptionImpl(new Id, name, NoChildren,
private def description = PlanDescriptionImpl(LogicalPlanId.DEFAULT, name, NoChildren,
Seq(Planner(plannerUsed.toTextOutput),
PlannerImpl(plannerUsed.name),
Runtime(runtimeUsed.toTextOutput),
Expand Down
Expand Up @@ -20,10 +20,10 @@
package org.neo4j.cypher.internal.compatibility.v3_3.runtime.pipes

import org.neo4j.cypher.internal.compatibility.v3_3.runtime.ExecutionContext
import org.neo4j.cypher.internal.compatibility.v3_3.runtime.planDescription.Id
import org.neo4j.cypher.internal.v3_3.logical.plans.LogicalPlanId
import org.neo4j.helpers.ValueUtils

case class AllNodesScanPipe(ident: String)(val id: Id = new Id) extends Pipe {
case class AllNodesScanPipe(ident: String)(val id: LogicalPlanId = LogicalPlanId.DEFAULT) extends Pipe {

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
val baseContext = state.createOrGetInitialContext()
Expand Down

0 comments on commit 119120e

Please sign in to comment.