Skip to content

Commit

Permalink
Solve problem where closing compiled execution result did not close u…
Browse files Browse the repository at this point in the history
…nderlying transaction correctly
  • Loading branch information
systay committed Feb 13, 2017
1 parent 7155aa1 commit 573897a
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 142 deletions.
Expand Up @@ -352,7 +352,7 @@ public void accept( ExpressionVisitor visitor )
} }


/** get static field */ /** get static field */
public static Expression get( final FieldReference field ) public static Expression getStatic(final FieldReference field )
{ {
return new Expression( field.type() ) return new Expression( field.type() )
{ {
Expand Down
Expand Up @@ -278,7 +278,7 @@ public void shouldGenerateStaticPrimitiveField() throws Throwable
FieldReference foo = simple.staticField( int.class, "FOO", constant( 42 ) ); FieldReference foo = simple.staticField( int.class, "FOO", constant( 42 ) );
try ( CodeBlock get = simple.generateMethod( int.class, "get" ) ) try ( CodeBlock get = simple.generateMethod( int.class, "get" ) )
{ {
get.returns( Expression.get( foo ) ); get.returns( Expression.getStatic( foo ) );
} }
handle = simple.handle(); handle = simple.handle();
} }
Expand All @@ -300,7 +300,7 @@ public void shouldGenerateStaticReferenceTypeField() throws Throwable
FieldReference foo = simple.staticField( String.class, "FOO", constant( "42" ) ); FieldReference foo = simple.staticField( String.class, "FOO", constant( "42" ) );
try ( CodeBlock get = simple.generateMethod( String.class, "get" ) ) try ( CodeBlock get = simple.generateMethod( String.class, "get" ) )
{ {
get.returns( Expression.get( foo ) ); get.returns( Expression.getStatic( foo ) );
} }
handle = simple.handle(); handle = simple.handle();
} }
Expand All @@ -326,7 +326,7 @@ public void shouldGenerateStaticParameterizedTypeField() throws Throwable
constant( "FOO" ), constant( "BAR" ), constant( "BAZ" ) ) ) ); constant( "FOO" ), constant( "BAR" ), constant( "BAZ" ) ) ) );
try ( CodeBlock get = simple.generateMethod( stringList, "get" ) ) try ( CodeBlock get = simple.generateMethod( stringList, "get" ) )
{ {
get.returns( Expression.get( foo ) ); get.returns( Expression.getStatic( foo ) );
} }
handle = simple.handle(); handle = simple.handle();
} }
Expand Down
Expand Up @@ -19,7 +19,10 @@
*/ */
package org.neo4j.cypher.internal.compiler.v3_2.executionplan package org.neo4j.cypher.internal.compiler.v3_2.executionplan


trait SuccessfulCloseable { /**
def success(): Unit * This trait is used to signal up the abstraction levels that running a query is completed, either
def close(): Unit * because the results have been exhausted, or because a failure has terminated the execution of the query.
*/
trait Completable {
def completed(success: Boolean): Unit
} }
Expand Up @@ -36,7 +36,7 @@ import scala.collection.{Map, mutable}
abstract class StandardInternalExecutionResult(context: QueryContext, abstract class StandardInternalExecutionResult(context: QueryContext,
taskCloser: Option[TaskCloser] = None) taskCloser: Option[TaskCloser] = None)
extends InternalExecutionResult extends InternalExecutionResult
with SuccessfulCloseable { with Completable {


self => self =>


Expand All @@ -45,8 +45,6 @@ abstract class StandardInternalExecutionResult(context: QueryContext,
protected val isGraphKernelResultValue = context.isGraphKernelResultValue _ protected val isGraphKernelResultValue = context.isGraphKernelResultValue _
private val scalaValues = new RuntimeScalaValueConverter(isGraphKernelResultValue, identity) private val scalaValues = new RuntimeScalaValueConverter(isGraphKernelResultValue, identity)


private var successful = false

protected def isOpen = !isClosed protected def isOpen = !isClosed
protected def isClosed = taskCloser.exists(_.isClosed) protected def isClosed = taskCloser.exists(_.isClosed)


Expand Down Expand Up @@ -82,15 +80,15 @@ abstract class StandardInternalExecutionResult(context: QueryContext,
formatOutput(writer, columns, builder.result(), queryStatistics()) formatOutput(writer, columns, builder.result(), queryStatistics())
} }


def success() = {
successful = true
}

override def planDescriptionRequested: Boolean = executionMode == ExplainMode || executionMode == ProfileMode override def planDescriptionRequested: Boolean = executionMode == ExplainMode || executionMode == ProfileMode
override def notifications = Iterable.empty[InternalNotification] override def notifications = Iterable.empty[InternalNotification]


override def close() = { override def close(): Unit = {
taskCloser.foreach(_.close(success = successful)) completed(success = true)
}

override def completed(success: Boolean): Unit = {
taskCloser.foreach(_.close(success = success))
} }


/* /*
Expand Down
Expand Up @@ -58,15 +58,25 @@ class ProcedureExecutionResult[E <: Exception](context: QueryContext,


override protected def createInner = new util.Iterator[util.Map[String, Any]]() { override protected def createInner = new util.Iterator[util.Map[String, Any]]() {
override def next(): util.Map[String, Any] = override def next(): util.Map[String, Any] =
try { resultAsMap( executionResults.next( ) ) } resultAsMap( executionResults.next( ) ) //TODO!!!¡¡¡¡
catch { case e: NoSuchElementException => success(); throw e } /*
override def next(): util.Map[String, Any] =
try { resultAsMap( executionResults.next( ) ) }
catch { case e: NoSuchElementException => success(); throw e }
override def hasNext: Boolean = if (executionResults.hasNext) true else { success(); close(); false } */

override def hasNext: Boolean = {
val moreToCome = executionResults.hasNext
if (!moreToCome) {
close()
}
moreToCome
}
} }


override def accept[EX <: Exception](visitor: InternalResultVisitor[EX]) = { override def accept[EX <: Exception](visitor: InternalResultVisitor[EX]) = {
executionResults.foreach { res => visitor.visit(new ResultRowImpl(resultAsRefMap(res))) } executionResults.foreach { res => visitor.visit(new ResultRowImpl(resultAsRefMap(res))) }
success()
close() close()
} }


Expand Down
Expand Up @@ -267,7 +267,7 @@ case class GeneratedMethodStructure(fields: Fields, generator: CodeBlock, aux: A


private def traceEvent(planStepId: String) = private def traceEvent(planStepId: String) =
invoke(tracer, executeOperator, invoke(tracer, executeOperator,
get(FieldReference.staticField(generator.owner(), typeRef[Id], planStepId))) getStatic(FieldReference.staticField(generator.owner(), typeRef[Id], planStepId)))


override def incrementDbHits() = if (tracing) generator.expression(invoke(loadEvent, Methods.dbHit)) override def incrementDbHits() = if (tracing) generator.expression(invoke(loadEvent, Methods.dbHit))


Expand Down Expand Up @@ -471,7 +471,7 @@ case class GeneratedMethodStructure(fields: Fields, generator: CodeBlock, aux: A
pop( pop(
invoke(generator.load(tableVar), countingTablePut, generator.load(keyVar), invoke(generator.load(tableVar), countingTablePut, generator.load(keyVar),
ternary( ternary(
equal(generator.load(countName), get(staticField[LongKeyIntValueTable, Int]("NULL"))), equal(generator.load(countName), getStatic(staticField[LongKeyIntValueTable, Int]("NULL"))),
constant(1), add(generator.load(countName), constant(1)))))) constant(1), add(generator.load(countName), constant(1))))))


case LongsToCountTable => case LongsToCountTable =>
Expand Down
Expand Up @@ -94,9 +94,9 @@ object Templates {
}, exception) }, exception)
} }


val incoming = Expression.get(staticField[Direction, Direction](Direction.INCOMING.name())) val incoming = Expression.getStatic(staticField[Direction, Direction](Direction.INCOMING.name()))
val outgoing = Expression.get(staticField[Direction, Direction](Direction.OUTGOING.name())) val outgoing = Expression.getStatic(staticField[Direction, Direction](Direction.OUTGOING.name()))
val both = Expression.get(staticField[Direction, Direction](Direction.BOTH.name())) val both = Expression.getStatic(staticField[Direction, Direction](Direction.BOTH.name()))
val newResultRow = Expression val newResultRow = Expression
.invoke(Expression.newInstance(typeRef[ResultRowImpl]), .invoke(Expression.newInstance(typeRef[ResultRowImpl]),
MethodReference.constructorReference(typeRef[ResultRowImpl])) MethodReference.constructorReference(typeRef[ResultRowImpl]))
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.cypher.acceptance

import org.neo4j.cypher.internal.ExecutionResult
import org.neo4j.cypher.{ExecutionEngineFunSuite, NewPlannerTestSupport}
import org.neo4j.kernel.api.KernelTransaction
import org.neo4j.kernel.api.security.SecurityContext._

class ExecutionResultAcceptanceTest extends ExecutionEngineFunSuite with NewPlannerTestSupport {

test("closing the result without exhausting it should not fail the transaction") {
val query = "UNWIND [1, 2, 3] as x RETURN x"

Seq(
s"CYPHER runtime=compiled $query",
s"CYPHER runtime=interpreted $query",
s"CYPHER 2.3 $query"
).foreach(q => {
val tx = graph.beginTransaction(KernelTransaction.Type.`explicit`, AUTH_DISABLED)
val result: ExecutionResult = eengine.execute(q, Map.empty[String, Object], graph.transactionalContext(query = q -> Map.empty))
tx.success()
result.close()
tx.close()
})
}
}
Expand Up @@ -19,13 +19,13 @@
*/ */
package org.neo4j.cypher.internal.compiled_runtime.v3_2.executionplan; package org.neo4j.cypher.internal.compiled_runtime.v3_2.executionplan;


import java.util.List;

import org.neo4j.cypher.internal.compiler.v3_2.ExecutionMode; import org.neo4j.cypher.internal.compiler.v3_2.ExecutionMode;
import org.neo4j.cypher.internal.compiler.v3_2.executionplan.SuccessfulCloseable; import org.neo4j.cypher.internal.compiler.v3_2.executionplan.Completable;
import org.neo4j.cypher.internal.compiler.v3_2.planDescription.InternalPlanDescription; import org.neo4j.cypher.internal.compiler.v3_2.planDescription.InternalPlanDescription;
import org.neo4j.cypher.internal.compiler.v3_2.spi.InternalResultVisitor; import org.neo4j.cypher.internal.compiler.v3_2.spi.InternalResultVisitor;


import java.util.List;

public interface GeneratedQueryExecution public interface GeneratedQueryExecution
{ {
List<String> javaColumns(); List<String> javaColumns();
Expand All @@ -36,5 +36,5 @@ public interface GeneratedQueryExecution


InternalPlanDescription executionPlanDescription(); InternalPlanDescription executionPlanDescription();


void setSuccessfulCloseable( SuccessfulCloseable closeable ); void setCompletable( Completable completable );
} }
Expand Up @@ -39,7 +39,7 @@ class CompiledExecutionResult(taskCloser: TaskCloser,
extends StandardInternalExecutionResult(context, Some(taskCloser)) extends StandardInternalExecutionResult(context, Some(taskCloser))
with StandardInternalExecutionResult.IterateByAccepting { with StandardInternalExecutionResult.IterateByAccepting {


compiledCode.setSuccessfulCloseable(this) compiledCode.setCompletable(this)


// *** Delegate to compiled code // *** Delegate to compiled code
def executionMode: ExecutionMode = compiledCode.executionMode() def executionMode: ExecutionMode = compiledCode.executionMode()
Expand Down
Expand Up @@ -24,7 +24,7 @@ import java.util
import org.mockito.Matchers._ import org.mockito.Matchers._
import org.mockito.Mockito._ import org.mockito.Mockito._
import org.neo4j.cypher.internal.compiled_runtime.v3_2.executionplan.GeneratedQueryExecution import org.neo4j.cypher.internal.compiled_runtime.v3_2.executionplan.GeneratedQueryExecution
import org.neo4j.cypher.internal.compiler.v3_2.executionplan.SuccessfulCloseable import org.neo4j.cypher.internal.compiler.v3_2.executionplan.Completable
import org.neo4j.cypher.internal.compiler.v3_2.planDescription.InternalPlanDescription import org.neo4j.cypher.internal.compiler.v3_2.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.compiler.v3_2.spi.{InternalResultRow, InternalResultVisitor, QueryContext} import org.neo4j.cypher.internal.compiler.v3_2.spi.{InternalResultRow, InternalResultVisitor, QueryContext}
import org.neo4j.cypher.internal.compiler.v3_2.{ExecutionMode, NormalMode, ResultRowImpl, TaskCloser} import org.neo4j.cypher.internal.compiler.v3_2.{ExecutionMode, NormalMode, ResultRowImpl, TaskCloser}
Expand Down Expand Up @@ -158,7 +158,7 @@ class CompiledExecutionResultTest extends CypherFunSuite {
taskCloser: TaskCloser = new TaskCloser, taskCloser: TaskCloser = new TaskCloser,
assertion: () => Unit = () => {}) = { assertion: () => Unit = () => {}) = {
val noCompiledCode: GeneratedQueryExecution = new GeneratedQueryExecution { val noCompiledCode: GeneratedQueryExecution = new GeneratedQueryExecution {
override def setSuccessfulCloseable(closeable: SuccessfulCloseable){} override def setCompletable(closeable: Completable){}
override def javaColumns(): util.List[String] = new util.ArrayList(row.keySet()) override def javaColumns(): util.List[String] = new util.ArrayList(row.keySet())
override def executionMode(): ExecutionMode = NormalMode override def executionMode(): ExecutionMode = NormalMode
override def accept[E <: Exception](visitor: InternalResultVisitor[E]): Unit = { override def accept[E <: Exception](visitor: InternalResultVisitor[E]): Unit = {
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/ */
package org.neo4j.cypher.internal.spi.v3_2.codegen package org.neo4j.cypher.internal.spi.v3_2.codegen


import org.neo4j.codegen.{FieldReference, MethodReference} import org.neo4j.codegen.FieldReference


case class Fields(closer: FieldReference, case class Fields(closer: FieldReference,
ro: FieldReference, ro: FieldReference,
Expand All @@ -29,6 +29,4 @@ case class Fields(closer: FieldReference,
tracer: FieldReference, tracer: FieldReference,
params: FieldReference, params: FieldReference,
closeable: FieldReference, closeable: FieldReference,
success: MethodReference,
close: MethodReference,
queryContext: FieldReference) queryContext: FieldReference)
Expand Up @@ -45,30 +45,35 @@ import org.neo4j.kernel.api.ReadOperations
import org.neo4j.kernel.api.schema.{IndexDescriptor, IndexDescriptorFactory, NodePropertyDescriptor} import org.neo4j.kernel.api.schema.{IndexDescriptor, IndexDescriptorFactory, NodePropertyDescriptor}
import org.neo4j.kernel.impl.api.RelationshipDataExtractor import org.neo4j.kernel.impl.api.RelationshipDataExtractor
import org.neo4j.kernel.impl.api.store.RelationshipIterator import org.neo4j.kernel.impl.api.store.RelationshipIterator

import GeneratedMethodStructure.CompletableFinalizer
import scala.collection.mutable import scala.collection.mutable


object GeneratedMethodStructure {
type CompletableFinalizer = Boolean => CodeBlock => Unit
}


class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux: AuxGenerator, tracing: Boolean = true, class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux: AuxGenerator, tracing: Boolean = true,
events: List[String] = List.empty, events: List[String] = List.empty,
onClose: Seq[CodeBlock => Unit] = Seq.empty, onClose: Seq[CompletableFinalizer] = Seq.empty,
locals: mutable.Map[String, LocalVariable] = mutable.Map.empty locals: mutable.Map[String, LocalVariable] = mutable.Map.empty
)(implicit context: CodeGenContext) )(implicit context: CodeGenContext)
extends MethodStructure[Expression] { extends MethodStructure[Expression] {


import GeneratedQueryStructure._ import GeneratedQueryStructure._
import TypeReference.parameterizedType import TypeReference.parameterizedType


private val _finalizers: mutable.ArrayBuffer[CodeBlock => Unit] = mutable.ArrayBuffer() private val _finalizers: mutable.ArrayBuffer[CompletableFinalizer] = mutable.ArrayBuffer()
_finalizers.appendAll(onClose) _finalizers.appendAll(onClose)


def finalizers: Seq[CodeBlock => Unit] = _finalizers def finalizers: Seq[CompletableFinalizer] = _finalizers


private def copy(fields: Fields = fields, private def copy(fields: Fields = fields,
generator: CodeBlock = generator, generator: CodeBlock = generator,
aux: AuxGenerator = aux, aux: AuxGenerator = aux,
tracing: Boolean = tracing, tracing: Boolean = tracing,
events: List[String] = events, events: List[String] = events,
onClose: Seq[CodeBlock => Unit] = _finalizers, onClose: Seq[CompletableFinalizer] = _finalizers,
locals: mutable.Map[String, LocalVariable] = locals): GeneratedMethodStructure = new GeneratedMethodStructure( locals: mutable.Map[String, LocalVariable] = locals): GeneratedMethodStructure = new GeneratedMethodStructure(
fields, generator, aux, tracing, events, onClose, locals) fields, generator, aux, tracing, events, onClose, locals)


Expand Down Expand Up @@ -200,8 +205,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux
invoke(generator.load(event), invoke(generator.load(event),
method[QueryExecutionEvent, Unit]("close"))) method[QueryExecutionEvent, Unit]("close")))
} }
generator.expression(invoke(generator.self(), fields.success)) _finalizers.foreach(codeBlock => codeBlock(true)(generator))
_finalizers.foreach(_ (generator))
generator.returns() generator.returns()
} }


Expand Down Expand Up @@ -246,8 +250,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux
body.expression(invoke(generator.load(event), body.expression(invoke(generator.load(event),
method[QueryExecutionEvent, Unit]("close"))) method[QueryExecutionEvent, Unit]("close")))
} }
body.expression(invoke(body.self(), fields.success)) _finalizers.foreach(block => block(true)(body))
_finalizers.foreach(_ (body))
body.returns() body.returns()
} }
}(exception = param[Throwable]("e")) { onError => }(exception = param[Throwable]("e")) { onError =>
Expand All @@ -256,7 +259,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux
invoke(onError.load(event), invoke(onError.load(event),
method[QueryExecutionEvent, Unit]("close"))) method[QueryExecutionEvent, Unit]("close")))
} }
_finalizers.foreach(_ (onError)) _finalizers.foreach(block => block(false)(onError))
onError.throwException(onError.load("e")) onError.throwException(onError.load("e"))
} }


Expand Down Expand Up @@ -314,7 +317,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux


private def traceEvent(planStepId: String) = private def traceEvent(planStepId: String) =
invoke(tracer, executeOperator, invoke(tracer, executeOperator,
get(FieldReference.staticField(generator.owner(), typeRef[Id], planStepId))) getStatic(FieldReference.staticField(generator.owner(), typeRef[Id], planStepId)))


override def incrementDbHits() = if (tracing) generator.expression(invoke(loadEvent, Methods.dbHit)) override def incrementDbHits() = if (tracing) generator.expression(invoke(loadEvent, Methods.dbHit))


Expand Down Expand Up @@ -567,7 +570,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux
if (codeGenTypes.size == 1 && codeGenTypes.head.repr == IntType) { if (codeGenTypes.size == 1 && codeGenTypes.head.repr == IntType) {
generator.assign(generator.declare(typeRef[PrimitiveLongSet], name), generator.assign(generator.declare(typeRef[PrimitiveLongSet], name),
invoke(method[Primitive, PrimitiveLongSet]("offHeapLongSet"))) invoke(method[Primitive, PrimitiveLongSet]("offHeapLongSet")))
_finalizers.append((block) => _finalizers.append((_: Boolean) => (block) =>
block.expression( block.expression(
invoke(block.load(name), method[PrimitiveLongSet, Unit]("close")))) invoke(block.load(name), method[PrimitiveLongSet, Unit]("close"))))


Expand Down Expand Up @@ -664,7 +667,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux
if (keyTypes.size == 1 && keyTypes.head.repr == IntType) { if (keyTypes.size == 1 && keyTypes.head.repr == IntType) {
generator.assign(generator.declare(typeRef[PrimitiveLongLongMap], name), generator.assign(generator.declare(typeRef[PrimitiveLongLongMap], name),
invoke(method[Primitive, PrimitiveLongLongMap]("offHeapLongLongMap"))) invoke(method[Primitive, PrimitiveLongLongMap]("offHeapLongLongMap")))
_finalizers.append((block) => _finalizers.append((_: Boolean) => (block) =>
block.expression( block.expression(
invoke(block.load(name), method[PrimitiveLongLongMap, Unit]("close")))) invoke(block.load(name), method[PrimitiveLongLongMap, Unit]("close"))))
} else { } else {
Expand Down Expand Up @@ -1003,7 +1006,7 @@ class GeneratedMethodStructure(val fields: Fields, val generator: CodeBlock, aux
pop( pop(
invoke(generator.load(tableVar), countingTablePut, generator.load(keyVar), invoke(generator.load(tableVar), countingTablePut, generator.load(keyVar),
ternary( ternary(
equal(generator.load(countName), get(staticField[LongKeyIntValueTable, Int]("NULL"))), equal(generator.load(countName), getStatic(staticField[LongKeyIntValueTable, Int]("NULL"))),
constant(1), add(generator.load(countName), constant(1)))))) constant(1), add(generator.load(countName), constant(1))))))


case LongsToCountTable => case LongsToCountTable =>
Expand Down

0 comments on commit 573897a

Please sign in to comment.