Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.0' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Feb 28, 2017
2 parents 9dd893f + f6ef545 commit 03a7a84
Show file tree
Hide file tree
Showing 34 changed files with 206 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,33 @@ class ProfilerAcceptanceTest extends ExecutionEngineFunSuite with CreateTempFile
assertDbHits(2)(result)("Distinct")
}

test("profile with filter using nested expressions pipe should report dbhits correctly") {
// GIVEN
createLabeledNode(Map("category_type"-> "cat"), "Category")
createLabeledNode(Map("category_type"-> "cat"), "Category")
val e1 = createLabeledNode(Map("domain_id"-> "1"), "Entity")
val e2 = createLabeledNode(Map("domain_id"-> "2"), "Entity")
val aNode = createNode()
relate(aNode, e1)
val anotherNode = createNode()
relate(anotherNode, e2)

relate(aNode, createNode(), "HAS_CATEGORY")
relate(anotherNode, createNode(), "HAS_CATEGORY")

// WHEN
val result = profileWithAllPlanners("""MATCH (cat:Category)
|WITH collect(cat) as categories
|MATCH (m:Entity)
|WITH m, categories
|MATCH (m)<-[r]-(n)
|WHERE ANY(x IN categories WHERE (n)-[:HAS_CATEGORY]->(x))
|RETURN count(n)""".stripMargin)

// THEN
assertDbHits(14)(result)("Filter")
}

private def assertRows(expectedRows: Int)(result: InternalExecutionResult)(names: String*) {
getPlanDescriptions(result, names).foreach {
plan => assert(expectedRows === getArgument[Rows](plan).value, s" wrong row count for plan: ${plan.name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.neo4j.cypher.internal.compiler.v3_1.commands._
import org.neo4j.cypher.internal.compiler.v3_1.commands.predicates.{CoercedPredicate, Predicate}
import org.neo4j.cypher.internal.compiler.v3_1.executionplan.Effects
import org.neo4j.cypher.internal.compiler.v3_1.helpers.TypeSafeMathSupport
import org.neo4j.cypher.internal.compiler.v3_1.pipes.QueryState
import org.neo4j.cypher.internal.compiler.v3_1.pipes.{Pipe, QueryState}
import org.neo4j.cypher.internal.compiler.v3_1.symbols.{SymbolTable, TypeSafe, Typed}
import org.neo4j.cypher.internal.frontend.v3_1.CypherTypeException
import org.neo4j.cypher.internal.frontend.v3_1.symbols.{CypherType, _}
Expand Down Expand Up @@ -54,6 +54,15 @@ abstract class Expression extends Typed with TypeSafe with EffectfulAstNode[Expr

def apply(ctx: ExecutionContext)(implicit state: QueryState):Any

private var _owningPipe: Option[Pipe] = None

def owningPipe: Pipe = _owningPipe.get

def registerOwningPipe(pipe: Pipe): Unit = rewrite( expr => {
expr._owningPipe = Some(pipe)
expr
})

/*When calculating the type of an expression, the expression should also
make sure to check the types of any downstream expressions*/
protected def calculateType(symbols: SymbolTable): CypherType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ the result of the NestedPipeExpression evaluation is a collection containing the
*/
case class NestedPipeExpression(pipe: Pipe, inner: Expression) extends Expression {
override def apply(ctx: ExecutionContext)(implicit state: QueryState): Any = {
val innerState = state.withInitialContext(ctx).withDecorator(state.decorator.innerDecorator )
val innerState = state.withInitialContext(ctx).withDecorator(state.decorator.innerDecorator(owningPipe))
pipe.createResults(innerState).map(ctx => inner(ctx)).toIndexedSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ case class DirectedRelationshipByIdSeekPipe(ident: String, relIdExpr: SeekArgs,
extends Pipe
with ListSupport
with RonjaPipe {
protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
val ctx = state.initialContext.getOrElse(ExecutionContext.empty)
val relIds = relIdExpr.expressions(ctx, state).flatMap(Option(_))
new DirectedRelationshipIdSeekIterator(ident, fromNode, toNode, ctx, state.query.relationshipOps, relIds.iterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ case class DistinctPipe(source: Pipe, expressions: Map[String, Expression])(val

val keyNames: Seq[String] = expressions.keys.toIndexedSeq

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
expressions.values.foreach(_.registerOwningPipe(this))

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
// Run the return item expressions, and replace the execution context's with their values
val result = input.map(ctx => {
val newMap = Eagerly.mutableMapValues(expressions, (expression: Expression) => expression(ctx)(state))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ case class EagerAggregationPipe(source: Pipe, keyExpressions: Set[String], aggre

val symbols: SymbolTable = createSymbols()

aggregations.values.foreach(_.registerOwningPipe(this))

private def createSymbols() = {
val keyVariables = keyExpressions.map(id => id -> source.symbols.evaluateType(id, CTAny)).toMap
val aggrVariables = aggregations.map {
Expand All @@ -48,10 +50,7 @@ case class EagerAggregationPipe(source: Pipe, keyExpressions: Set[String], aggre
SymbolTable(keyVariables ++ aggrVariables)
}

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
// This is the temporary storage used while the aggregation is going on
val result = MutableMap[Equals, Seq[AggregationFunction]]()
val keyNames = keyExpressions.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ object ExtractPipe {

case class ExtractPipe(source: Pipe, expressions: Map[String, Expression], hack_remove_this:Boolean)
(implicit pipeMonitor: PipeMonitor) extends PipeWithSource(source, pipeMonitor) {

expressions.values.foreach(_.registerOwningPipe(this))

val symbols: SymbolTable = {
val newVariables = expressions.map {
case (name, expression) => name -> expression.getType(source.symbols)
Expand Down Expand Up @@ -98,9 +101,6 @@ case class ExtractPipe(source: Pipe, expressions: Map[String, Expression], hack_
}

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

input.map( ctx => applyExpressions(ctx, state) )
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ case class FilterPipe(source: Pipe, predicate: Predicate)(val estimatedCardinali
(implicit pipeMonitor: PipeMonitor) extends PipeWithSource(source, pipeMonitor) with RonjaPipe {
val symbols = source.symbols

protected def internalCreateResults(input: Iterator[ExecutionContext],state: QueryState) = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
predicate.registerOwningPipe(this)

protected def internalCreateResults(input: Iterator[ExecutionContext],state: QueryState) = {
input.filter(ctx => predicate.isTrue(ctx)(state))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ case class LegacySortPipe(source: Pipe, sortDescription: List[SortItem])
(implicit pipeMonitor: PipeMonitor) extends PipeWithSource(source, pipeMonitor) with ExecutionContextComparer with NoEffectsPipe {
def symbols = source.symbols

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
sortDescription.foreach(_.expression.registerOwningPipe(this))

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
input.toList.
sortWith((a, b) => compareBy(a, b, sortDescription)(state)).iterator
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ case class LetSelectOrSemiApplyPipe(source: Pipe, inner: Pipe, letVarName: Strin
(val estimatedCardinality: Option[Double] = None)
(implicit pipeMonitor: PipeMonitor)
extends PipeWithSource(source, pipeMonitor) with RonjaPipe {
def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

predicate.registerOwningPipe(this)

def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
input.map {
(outerContext) =>
val holds = predicate.isTrue(outerContext)(state) || {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import org.neo4j.cypher.internal.compiler.v3_1.symbols.SymbolTable
case class LimitPipe(source: Pipe, exp: Expression)
(val estimatedCardinality: Option[Double] = None)(implicit pipeMonitor: PipeMonitor)
extends PipeWithSource(source, pipeMonitor) with NumericHelper with RonjaPipe {

exp.registerOwningPipe(this)

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
if(input.isEmpty)
return Iterator.empty

//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

implicit val s = state

val first: ExecutionContext = input.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ case class LoadCSVPipe(source: Pipe,
(val estimatedCardinality: Option[Double] = None)(implicit pipeMonitor: PipeMonitor)
extends PipeWithSource(source, pipeMonitor) with RonjaPipe {

urlExpression.registerOwningPipe(this)

protected def getImportURL(urlString: String, context: QueryContext): URL = {
val url: URL = try {
new URL(urlString)
Expand Down Expand Up @@ -97,9 +99,6 @@ case class LoadCSVPipe(source: Pipe,
}

override protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

input.flatMap(context => {
implicit val s = state
val urlString: String = urlExpression(context).asInstanceOf[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ case class MatchPipe(source: Pipe,
val symbols = matchingContext.symbols
val variablesBoundInSource = variablesInClause intersect source.symbols.keys.toSet

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
predicates.foreach(_.registerOwningPipe(this))

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
input.flatMap {
ctx =>
if (variablesBoundInSource.exists(i => ctx(i) == null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ import org.neo4j.cypher.internal.frontend.v3_1.symbols.CTNode

sealed trait SeekArgs {
def expressions(ctx: ExecutionContext, state: QueryState): Iterable[Any]
def registerOwningPipe(pipe: Pipe): Unit
}

object SeekArgs {
object empty extends SeekArgs {
def expressions(ctx: ExecutionContext, state: QueryState): Iterable[Any] = Iterable.empty

override def registerOwningPipe(pipe: Pipe){}
}
}

Expand All @@ -42,6 +45,8 @@ case class SingleSeekArg(expr: Expression) extends SeekArgs {
expr(ctx)(state) match {
case value => Iterable(value)
}

override def registerOwningPipe(pipe: Pipe): Unit = expr.registerOwningPipe(pipe)
}

case class ManySeekArgs(coll: Expression) extends SeekArgs {
Expand All @@ -50,6 +55,8 @@ case class ManySeekArgs(coll: Expression) extends SeekArgs {
case IsList(values) => values
}
}

override def registerOwningPipe(pipe: Pipe): Unit = coll.registerOwningPipe(pipe)
}

case class NodeByIdSeekPipe(ident: String, nodeIdsExpr: SeekArgs)
Expand All @@ -58,10 +65,9 @@ case class NodeByIdSeekPipe(ident: String, nodeIdsExpr: SeekArgs)
with ListSupport
with RonjaPipe {

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
nodeIdsExpr.registerOwningPipe(this)

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
val ctx = state.initialContext.getOrElse(ExecutionContext.empty)
val nodeIds = nodeIdsExpr.expressions(ctx, state)
new NodeIdSeekIterator(ident, ctx, state.query.nodeOps, nodeIds.iterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ abstract class AbstractNodeIndexStringScanPipe(ident: String,

private val descriptor = IndexDescriptor(label.nameId.id, propertyKey.nameId.id)

override protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
valueExpr.registerOwningPipe(this)

override protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
val baseContext = state.initialContext.getOrElse(ExecutionContext.empty)
val value = valueExpr(baseContext)(state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ case class NodeIndexScanPipe(ident: String,
private val descriptor = IndexDescriptor(label.nameId.id, propertyKey.nameId.id)

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

val baseContext = state.initialContext.getOrElse(ExecutionContext.empty)
val resultNodes = state.query.indexScan(descriptor)
resultNodes.map(node => baseContext.newWith1(ident, node))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ case class NodeIndexSeekPipe(ident: String,
(val estimatedCardinality: Option[Double] = None)(implicit pipeMonitor: PipeMonitor)
extends Pipe with RonjaPipe {

valueExpr.expression.registerOwningPipe(this)

private val descriptor = IndexDescriptor(label.nameId.id, propertyKey.nameId.id)

private val indexFactory = indexMode.indexFactory(descriptor)

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

val index = indexFactory(state)
val baseContext = state.initialContext.getOrElse(ExecutionContext.empty)
val resultNodes = indexQuery(valueExpr, baseContext, state, index, label.name, propertyKey.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ case class OptionalExpandAllPipe(source: Pipe, fromName: String, relName: String
(val estimatedCardinality: Option[Double] = None)(implicit pipeMonitor: PipeMonitor)
extends PipeWithSource(source, pipeMonitor) with RonjaPipe {

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
predicate.registerOwningPipe(this)

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
implicit val s = state

input.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,7 @@ trait PipeDecorator {
* Returns the inner decorator of this decorator. The inner decorator is used for nested expressions
* where the `decorate` should refer to the parent pipe instead of the calling pipe.
*/
def innerDecorator: PipeDecorator

/*
* Registers a parent pipe. Used by the innerDecorator to associate data with the parent pipe instead of
* the calling pipe.
*/
def registerParentPipe(pipe: Pipe): Unit
def innerDecorator(pipe: Pipe): PipeDecorator
}

object NullPipeDecorator extends PipeDecorator {
Expand All @@ -52,7 +46,5 @@ object NullPipeDecorator extends PipeDecorator {

def decorate(pipe: Pipe, state: QueryState): QueryState = state

def innerDecorator: PipeDecorator = NullPipeDecorator

def registerParentPipe(pipe: Pipe) {}
def innerDecorator(pipe: Pipe): PipeDecorator = NullPipeDecorator
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ case class ProcedureCallPipe(source: Pipe,
(implicit monitor: PipeMonitor)
extends PipeWithSource(source, monitor) with ListSupport with RonjaPipe {

argExprs.foreach(_.registerOwningPipe(this))

private val rowProcessor = rowProcessing match {
case FlatMapAndAppendToRow => internalCreateResultsByAppending _
case PassThroughRow => internalCreateResultsByPassingThrough _
}

override protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
val converter = new RuntimeJavaValueConverter(state.query.isGraphKernelResultValue, state.typeConverter.asPublicType)

rowProcessor(input, state, converter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ It's an additive operation - nothing is lost in the execution context, the pipe
*/
case class ProjectionPipe(source: Pipe, expressions: Map[String, Expression])(val estimatedCardinality: Option[Double] = None)
(implicit pipeMonitor: PipeMonitor) extends PipeWithSource(source, pipeMonitor) with RonjaPipe {

expressions.values.foreach(_.registerOwningPipe(this))

val symbols = {
val newVariables = expressions.map {
case (name, expression) => name -> expression.getType(source.symbols)
Expand All @@ -39,8 +42,6 @@ case class ProjectionPipe(source: Pipe, expressions: Map[String, Expression])(va
}

protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState) = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)
input.map {
ctx =>
expressions.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ case class SelectOrSemiApplyPipe(source: Pipe, inner: Pipe, predicate: Predicate
(val estimatedCardinality: Option[Double] = None)
(implicit pipeMonitor: PipeMonitor)
extends PipeWithSource(source, pipeMonitor) with RonjaPipe {
def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
//register as parent so that stats are associated with this pipe
state.decorator.registerParentPipe(this)

predicate.registerOwningPipe(this)

def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
input.filter {
(outerContext) =>
predicate.isTrue(outerContext)(state) || {
Expand Down

0 comments on commit 03a7a84

Please sign in to comment.