Skip to content

Commit

Permalink
Eagerize write procedure results in cypher
Browse files Browse the repository at this point in the history
  • Loading branch information
jakewins authored and boggle committed Feb 8, 2016
1 parent 3e82a2a commit 0f0a2af
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.neo4j.cypher.internal.compiler.v3_0.ast.convert.commands.ExpressionCo
import org.neo4j.cypher.internal.compiler.v3_0.executionplan.{ExecutionPlan, InternalExecutionResult, READ_ONLY}
import org.neo4j.cypher.internal.compiler.v3_0.pipes.{ExternalCSVResource, QueryState}
import org.neo4j.cypher.internal.compiler.v3_0.planDescription.{Id, NoChildren, PlanDescriptionImpl}
import org.neo4j.cypher.internal.compiler.v3_0.spi.{FieldSignature, GraphStatistics, ProcedureSignature, QueryContext}
import org.neo4j.cypher.internal.compiler.v3_0.spi._
import org.neo4j.cypher.internal.compiler.v3_0.{ExecutionContext, ExecutionMode, ExplainExecutionResult, ExplainMode, ProcedurePlannerName, ProcedureRuntimeName, TaskCloser}
import org.neo4j.cypher.internal.frontend.v3_0.{ParameterNotFoundException, InvalidArgumentException}
import org.neo4j.cypher.internal.frontend.v3_0.ast.Expression
Expand Down Expand Up @@ -58,8 +58,14 @@ case class CallProcedureExecutionPlan(signature: ProcedureSignature, providedArg
taskCloser.close(success = true)
new ExplainExecutionResult(signature.outputSignature.seq.map(_.name).toList,
description, READ_ONLY, Set.empty)
} else
ProcedureExecutionResult(taskCloser, ctx, signature, input, description, planType)
} else {
val result: ProcedureExecutionResult[Nothing] = ProcedureExecutionResult(taskCloser, ctx, signature, input, description, planType)
// Naive eagerization for now, refactor as need arises
if( signature.mode == ProcReadWrite )
result.toEagerIterableResult(ProcedurePlannerName, ProcedureRuntimeName)
else result

}
}

private def fail(f: FieldSignature, ctx: QueryContext) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ case class ProcedureExecutionResult[E <: Exception](taskCloser: TaskCloser,
override def javaColumns: java.util.List[String] = signature.outputSignature.seq.map(_.name).asJava

override def accept[EX <: Exception](visitor: InternalResultVisitor[EX]) = {
signature.mode.call(context, signature, args.map(asJavaCompatible)).foreach { res =>
val call: Iterator[Array[AnyRef]] = signature.mode.call(context, signature, args.map(asJavaCompatible))
call.foreach { res =>
var i = 0
val row = new ResultRowImpl
signature.outputSignature.foreach { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,18 @@ import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.neo4j.cypher.internal.compiler.v3_0.NormalMode
import org.neo4j.cypher.internal.compiler.v3_0.spi.{FieldSignature, ProcedureName, ProcedureSignature, QueryContext}
import org.neo4j.cypher.internal.compiler.v3_0.spi._
import org.neo4j.cypher.internal.frontend.v3_0.ast.{Add, Expression, SignedDecimalIntegerLiteral, StringLiteral}
import org.neo4j.cypher.internal.frontend.v3_0.test_helpers.CypherFunSuite
import org.neo4j.cypher.internal.frontend.v3_0.{DummyPosition, InvalidArgumentException, symbols}
import org.neo4j.cypher.internal.frontend.v3_0.{ParameterNotFoundException, DummyPosition, InvalidArgumentException, symbols}

import scala.collection.mutable

class CallProcedureExecutionPlanTest extends CypherFunSuite {

test("should be able to call procedure with single argument") {
// Given
val signature = ProcedureSignature(
ProcedureName(Seq.empty, "foo"), Seq(FieldSignature("a", symbols.CTInteger)),
Seq(FieldSignature("b", symbols.CTInteger)))

val proc = CallProcedureExecutionPlan(signature, Some(Seq(add(int(42), int(42)))))

val proc = CallProcedureExecutionPlan(readSignature, Some(Seq(add(int(42), int(42)))))
// When
val res = proc.run(ctx, NormalMode, Map.empty)

Expand All @@ -48,11 +45,7 @@ class CallProcedureExecutionPlanTest extends CypherFunSuite {

test("should be able to call procedure with single argument using parameters") {
// Given
val signature = ProcedureSignature(
ProcedureName(Seq.empty, "foo"), Seq(FieldSignature("a", symbols.CTInteger)),
Seq(FieldSignature("b", symbols.CTInteger)))

val proc = CallProcedureExecutionPlan(signature, None)
val proc = CallProcedureExecutionPlan(readSignature, None)

// When
val res = proc.run(ctx, NormalMode, Map("a" -> 84))
Expand All @@ -62,31 +55,67 @@ class CallProcedureExecutionPlanTest extends CypherFunSuite {
}

test("should fail if parameter is missing") {
// When
val proc = CallProcedureExecutionPlan(readSignature, None)

// Then
an [ParameterNotFoundException] should be thrownBy proc.run(ctx, NormalMode, Map())
}

test("should eagerize write procedure") {
// Given
val signature = ProcedureSignature(
ProcedureName(Seq.empty, "foo"), Seq(FieldSignature("a", symbols.CTInteger), FieldSignature("b", symbols.CTInteger)),
Seq(FieldSignature("c", symbols.CTInteger)))
val proc = CallProcedureExecutionPlan(writeSignature, Some(Seq(add(int(42), int(42)))))

// When
val proc = CallProcedureExecutionPlan(signature, None)
proc.run(ctx, NormalMode, Map.empty)

// Then
an [InvalidArgumentException] should be thrownBy proc.run(ctx, NormalMode, Map("a" -> 84))
// Then without touching the result, it should have been spooled out
iteratorExhausted should equal(true)
}

test("should not eagerize read procedure") {
// Given
val proc = CallProcedureExecutionPlan(readSignature, Some(Seq(add(int(42), int(42)))))

// When
proc.run(ctx, NormalMode, Map.empty)

// Then without touching the result, the Kernel iterator should not be touched
iteratorExhausted should equal(false)
}

override protected def beforeEach = iteratorExhausted = false

def add(lhs: Expression, rhs: Expression): Expression = Add(lhs, rhs)(pos)

def int(i: Int): Expression = SignedDecimalIntegerLiteral(i.toString)(pos)

def string(s: String): Expression = StringLiteral(s)(pos)

private val readSignature = ProcedureSignature( ProcedureName(Seq.empty, "foo"),
Seq(FieldSignature("a", symbols.CTInteger) ),
Seq(FieldSignature("b", symbols.CTInteger)))

private val writeSignature = ProcedureSignature( readSignature.name,
readSignature.inputSignature, readSignature.outputSignature, ProcReadWrite )

private val pos = DummyPosition(-1)
val ctx = mock[QueryContext]
when(ctx.callReadOnlyProcedure(any[ProcedureSignature], any[Seq[AnyRef]])).thenAnswer(new Answer[Iterator[Array[AnyRef]]] {
var iteratorExhausted = false

val procedureResult = new Answer[Iterator[Array[AnyRef]]] {
override def answer(invocationOnMock: InvocationOnMock) = {
val input = invocationOnMock.getArguments()(1).asInstanceOf[Seq[AnyRef]]
Iterator.single(input.toArray)
new Iterator[Array[AnyRef]] {
override def hasNext = !iteratorExhausted
override def next() = if(hasNext) {
iteratorExhausted = true
input.toArray
} else throw new IllegalStateException("Iterator exhausted")
}
}
})
}

when(ctx.callReadOnlyProcedure(any[ProcedureSignature], any[Seq[AnyRef]])).thenAnswer(procedureResult)
when(ctx.callReadWriteProcedure(any[ProcedureSignature], any[Seq[AnyRef]])).thenAnswer(procedureResult)
}

0 comments on commit 0f0a2af

Please sign in to comment.