Skip to content

Commit

Permalink
Do not construct iterator tree as a side-effect of checking isExhausted
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Aug 21, 2018
1 parent 33e4888 commit 2424df8
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 37 deletions.
Expand Up @@ -21,6 +21,7 @@ package org.neo4j.cypher.internal.compatibility

import java.io.PrintWriter
import java.util
import java.util.NoSuchElementException

import org.neo4j.cypher.exceptionHandler.RunSafely
import org.neo4j.cypher.internal.runtime.planDescription.InternalPlanDescription
Expand Down Expand Up @@ -69,14 +70,20 @@ class ClosingExecutionResult(val query: ExecutingQuery, val inner: InternalExecu

new graphdb.ResourceIterator[java.util.Map[String, AnyRef]] {
def next(): util.Map[String, AnyRef] = safely {
val result = innerIterator.next
closeIfEmpty(innerIterator)
result
if (inner.isClosed) throw new NoSuchElementException
else {
val result = innerIterator.next
closeIfEmpty(innerIterator)
result
}
}

def hasNext: Boolean = safely {
closeIfEmpty(innerIterator)
innerIterator.hasNext
if (inner.isClosed) false
else {
closeIfEmpty(innerIterator)
innerIterator.hasNext
}
}

def close(): Unit = self.close()
Expand Down
Expand Up @@ -37,10 +37,13 @@ class PipeExecutionResult(val result: IteratorBasedResult,
self =>

private val query = state.query
private var resultRequested = false

val javaValues = new RuntimeJavaValueConverter(isGraphKernelResultValue)
def isIterable: Boolean = true

def asIterator: ResourceIterator[java.util.Map[String, AnyRef]] =
def asIterator: ResourceIterator[java.util.Map[String, AnyRef]] = {
resultRequested = true
new WrappingResourceIterator[util.Map[String, AnyRef]] {
private val inner = result.mapIterator
def hasNext: Boolean = inner.hasNext
Expand All @@ -51,6 +54,7 @@ class PipeExecutionResult(val result: IteratorBasedResult,
javaRow.put(kv._1, query.asObject(kv._2))
javaRow
}
}
}

override def queryStatistics(): QueryStatistics = state.getStatistics
Expand All @@ -63,11 +67,13 @@ class PipeExecutionResult(val result: IteratorBasedResult,
}

override def accept[EX <: Exception](visitor: QueryResultVisitor[EX]): Unit = {
resultRequested = true
val maybeRecordIterator = result.recordIterator
if (maybeRecordIterator.isDefined)
javaValues.feedQueryResultRecordIteratorToVisitable(maybeRecordIterator.get).accept(visitor)
else
javaValues.feedIteratorToVisitable(result.mapIterator.map(r => fieldNames.map(r))).accept(visitor)
}
override def isExhausted: Boolean = !result.mapIterator.hasNext

override def isExhausted: Boolean = resultRequested && !result.mapIterator.hasNext
}
Expand Up @@ -83,7 +83,7 @@ class StandardInternalExecutionResult(context: QueryContext,
/**
* ...and if we do not return any rows, we close all resources.
*/
if (noRows || queryType == WRITE || queryType == SCHEMA_WRITE) {
if (noRows || queryType == WRITE || fieldNames().isEmpty) {
close(Success)
}

Expand Down
Expand Up @@ -61,6 +61,7 @@ class ProcedureCallRuntimeResult(context: QueryContext,
override val fieldNames: Array[String] = indexResultNameMappings.map(_._2).toArray

private final val executionResults: Iterator[Array[AnyRef]] = executeCall
private var resultRequested = false

// The signature mode is taking care of eagerization
protected def executeCall: Iterator[Array[AnyRef]] = {
Expand All @@ -74,7 +75,8 @@ class ProcedureCallRuntimeResult(context: QueryContext,
iterator
}

override def asIterator(): ResourceIterator[util.Map[String, AnyRef]] =
override def asIterator(): ResourceIterator[util.Map[String, AnyRef]] = {
resultRequested = true
new ResourceIterator[util.Map[String, AnyRef]]() {
override def next(): util.Map[String, AnyRef] =
resultAsMap(executionResults.next())
Expand All @@ -89,13 +91,15 @@ class ProcedureCallRuntimeResult(context: QueryContext,

override def close(): Unit = self.close()
}
}

private def transform[T](value: AnyRef, f: T => AnyValue): AnyValue = {
if (value == null) NO_VALUE
else f(value.asInstanceOf[T])
}

override def accept[EX <: Exception](visitor: QueryResultVisitor[EX]): Unit = {
resultRequested = true
executionResults.foreach { res =>
val fieldArray = new Array[AnyValue](indexResultNameMappings.size)
for (i <- indexResultNameMappings.indices) {
Expand Down Expand Up @@ -130,8 +134,6 @@ class ProcedureCallRuntimeResult(context: QueryContext,
close()
}

// TODO Look into having the kernel track updates, rather than cypher middle-layers, only sensible way I can think
// of to get accurate stats for procedure code
override def queryStatistics(): QueryStatistics = context.getOptStatistics.getOrElse(QueryStatistics())

private def resultAsMap(rowData: Array[AnyRef]): util.Map[String, AnyRef] = {
Expand All @@ -142,7 +144,7 @@ class ProcedureCallRuntimeResult(context: QueryContext,

override def isIterable: Boolean = true

override def isExhausted: Boolean = !executionResults.hasNext
override def isExhausted: Boolean = resultRequested && !executionResults.hasNext

override def close(): Unit = {}

Expand Down
Expand Up @@ -32,6 +32,8 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy

private val combinedCallconfiguration = Configs.Interpreted - Configs.AllRulePlanners - Configs.Version2_3

private val config = Configs.Procs + Configs.Cost3_1

test("should be able to filter as part of call") {
// Given
createLabeledNode("A")
Expand Down Expand Up @@ -60,7 +62,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
// When
// we cannot assert on the results because on each call
// the generated virtual nodes will have different IDs
val result = executeWith(Configs.Procs, "CALL db.schema()", expectedDifferentResults = Configs.Procs).toList
val result = executeWith(config, "CALL db.schema()", expectedDifferentResults = config).toList

// Then
result.size should equal(1)
Expand Down Expand Up @@ -139,7 +141,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
test("db.labels work on an empty database") {
// Given an empty database
//When
val result = executeWith(Configs.Procs, "CALL db.labels")
val result = executeWith(config, "CALL db.labels")

// Then
result.toList shouldBe empty
Expand All @@ -151,7 +153,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
execute("MATCH (a:A) REMOVE a:A")

//When
val result = executeWith(Configs.Procs, "CALL db.labels")
val result = executeWith(config, "CALL db.labels")

// Then
result shouldBe empty
Expand All @@ -163,7 +165,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
execute("MATCH (a) DETACH DELETE a")

//When
val result = executeWith(Configs.Procs, "CALL db.labels")
val result = executeWith(config, "CALL db.labels")

// Then
result shouldBe empty
Expand All @@ -176,7 +178,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
relate(createNode(), createNode(), "C")

// When
val result = executeWith(Configs.Procs, "CALL db.relationshipTypes")
val result = executeWith(config, "CALL db.relationshipTypes")

// Then
result.toList should equal(
Expand All @@ -189,7 +191,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
test("db.relationshipType work on an empty database") {
// Given an empty database
//When
val result = executeWith(Configs.Procs, "CALL db.relationshipTypes")
val result = executeWith(config, "CALL db.relationshipTypes")

// Then
result shouldBe empty
Expand All @@ -203,7 +205,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
execute("MATCH (a) DETACH DELETE a")

//When
val result = executeWith(Configs.Procs, "CALL db.relationshipTypes")
val result = executeWith(config, "CALL db.relationshipTypes")

// Then
result shouldBe empty
Expand All @@ -214,7 +216,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
createNode("A" -> 1, "B" -> 2, "C" -> 3)

// When
val result = executeWith(Configs.Procs, "CALL db.propertyKeys")
val result = executeWith(config, "CALL db.propertyKeys")

// Then
result.toList should equal(
Expand All @@ -228,7 +230,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
// Given an empty database

// When
val result = executeWith(Configs.Procs, "CALL db.propertyKeys")
val result = executeWith(config, "CALL db.propertyKeys")

// Then
result shouldBe empty
Expand All @@ -240,7 +242,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
execute("MATCH (a)-[r]-(b) REMOVE a.A, r.R, b.B")

// When
val result = executeWith(Configs.Procs, "CALL db.propertyKeys")
val result = executeWith(config, "CALL db.propertyKeys")

// Then
result.toList should equal(
Expand All @@ -256,7 +258,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
execute("MATCH (a) DETACH DELETE a")

// When
val result = executeWith(Configs.Procs, "CALL db.propertyKeys")
val result = executeWith(config, "CALL db.propertyKeys")

// Then
result.toList should equal(
Expand All @@ -271,7 +273,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
graph.createIndex("A", "prop")

//When
val result = executeWith(Configs.Procs, "CALL db.indexes")
val result = executeWith(config, "CALL db.indexes")


// Then
Expand All @@ -296,7 +298,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy

test("should create index from built-in-procedure") {
// when
val createResult = executeWith(Configs.Procs, "CALL db.createIndex(\":Person(name)\",\"lucene+native-1.0\")")
val createResult = executeWith(config, "CALL db.createIndex(\":Person(name)\",\"lucene+native-1.0\")")

// then
createResult.toList should equal(
Expand All @@ -309,7 +311,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
graph.execute("CALL db.awaitIndexes(10)")

// when
val listResult = executeWith(Configs.Procs, "CALL db.indexes()")
val listResult = executeWith(config, "CALL db.indexes()")

// Then
listResult.toList should equal(
Expand All @@ -327,7 +329,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy

test("should create unique property constraint from built-in-procedure") {
// when
val createResult = executeWith(Configs.Procs, "CALL db.createUniquePropertyConstraint(\":Person(name)\",\"lucene+native-1.0\")")
val createResult = executeWith(config, "CALL db.createUniquePropertyConstraint(\":Person(name)\",\"lucene+native-1.0\")")

// then
createResult.toList should equal(
Expand All @@ -340,7 +342,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
graph.execute("CALL db.awaitIndexes(10)")

// when
val listResult = executeWith(Configs.Procs, "CALL db.indexes()")
val listResult = executeWith(config, "CALL db.indexes()")

listResult.toList should equal(
List(Map("description" -> "INDEX ON :Person(name)",
Expand All @@ -357,7 +359,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy

test("should create node key constraint from built-in-procedure") {
// when
val createResult = executeWith(Configs.Procs, "CALL db.createNodeKey(\":Person(name)\",\"lucene+native-1.0\")")
val createResult = executeWith(config, "CALL db.createNodeKey(\":Person(name)\",\"lucene+native-1.0\")")

// then
createResult.toList should equal(
Expand All @@ -370,7 +372,7 @@ class BuiltInProcedureAcceptanceTest extends ProcedureCallAcceptanceTest with Cy
graph.execute("CALL db.awaitIndexes(10)")

// when
val listResult = executeWith(Configs.Procs, "CALL db.indexes()")
val listResult = executeWith(config, "CALL db.indexes()")

listResult.toList should equal(
List(Map("description" -> "INDEX ON :Person(name)",
Expand Down
Expand Up @@ -30,7 +30,7 @@ class CompositeUniquenessConstraintAcceptanceTest extends ExecutionEngineFunSuit

test("should be able to create and remove single property uniqueness constraint") {

val testconfiguration = Configs.Procs
val testconfiguration = Configs.Procs - Configs.Cost3_1
// When
executeWith(testconfiguration, "CREATE CONSTRAINT ON (n:Person) ASSERT (n.email) IS UNIQUE")

Expand Down Expand Up @@ -60,7 +60,7 @@ class CompositeUniquenessConstraintAcceptanceTest extends ExecutionEngineFunSuit

test("should fail to to drop composite uniqueness constraints") {
// When
failWithError(singlePropertyUniquenessFailConf + Configs.Morsel + Configs.Procs,
failWithError(singlePropertyUniquenessFailConf + Configs.Procs - Configs.Cost3_1,
"DROP CONSTRAINT ON (n:Person) ASSERT (n.firstname,n.lastname) IS UNIQUE",
List("Only single property uniqueness constraints are supported"))

Expand Down
Expand Up @@ -434,7 +434,7 @@ object CypherComparisonSupport {

implicit def plannerToPlanners(planner: Planner): Planners = Planners(planner)

object Cost extends Planner(Set("COST", "IDP"), "planner=cost")
object Cost extends Planner(Set("COST", "IDP", "PROCEDURE"), "planner=cost")

object Rule extends Planner(Set("RULE"), "planner=rule")

Expand Down
Expand Up @@ -34,7 +34,7 @@ import org.neo4j.graphdb._
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.internal.cypher.acceptance.CypherComparisonSupport.Runtimes.ProcedureOrSchema
import org.neo4j.internal.cypher.acceptance.CypherComparisonSupport.Versions.{V3_1, v3_5}
import org.neo4j.internal.cypher.acceptance.CypherComparisonSupport.Versions.{V3_1, V3_4, v3_5}
import org.neo4j.internal.cypher.acceptance.CypherComparisonSupport._
import org.neo4j.internal.kernel.api.Transaction.Type
import org.neo4j.io.fs.FileUtils
Expand Down Expand Up @@ -726,10 +726,10 @@ order by a.COL1""".format(a, b))

val testConfiguration =
TestConfiguration(
Versions(V3_1, v3_5, Versions.Default),
Planners.Default,
Versions(V3_1, V3_4, v3_5, Versions.Default),
Planners(Planners.Cost, Planners.Default),
Runtimes(ProcedureOrSchema, Runtimes.Default)
) + Configs.Rule2_3 + Configs.Cost3_4
) + Configs.Rule2_3

// WHEN
executeWith(testConfiguration, s"""CREATE INDEX ON :$labelName(${propertyKeys.reduce(_ ++ "," ++ _)})""")
Expand Down

0 comments on commit 2424df8

Please sign in to comment.