Skip to content

Commit

Permalink
Fixed closing of compiled execution result
Browse files Browse the repository at this point in the history
Problem occurred when closing ExecutionResult that had underlying result
object from compiled runtime. Close method ensured that iterator over
underlying result was initialized regardless whether it was consumed via
iterator or visitor. For compiled execution result this caused visitor to be
called twice - once for consumption and once for closing.

Issue fixed by always closing underlying execution result and closing
iterator, obtained from underlying result, only if it is initialized.
  • Loading branch information
lutovich committed Jul 31, 2015
1 parent e401fe0 commit 713dcc7
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 44 deletions.
Expand Up @@ -21,11 +21,10 @@ package org.neo4j.cypher.internal.compiler.v2_3.executionplan


import java.util import java.util


import org.neo4j.cypher.internal.compiler.v2_3.{NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_3.TaskCloser
import org.neo4j.cypher.internal.compiler.v2_3.codegen.ResultRowImpl import org.neo4j.cypher.internal.compiler.v2_3.codegen.ResultRowImpl
import org.neo4j.cypher.internal.compiler.v2_3.planDescription.InternalPlanDescription import org.neo4j.cypher.internal.compiler.v2_3.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.compiler.v2_3.test_helpers.CypherFunSuite import org.neo4j.cypher.internal.compiler.v2_3.test_helpers.CypherFunSuite
import org.neo4j.cypher.internal.compiler.v2_3.{ExecutionMode, NormalMode, TaskCloser}
import org.neo4j.graphdb.Result.{ResultRow, ResultVisitor} import org.neo4j.graphdb.Result.{ResultRow, ResultVisitor}
import org.neo4j.helpers.collection.Iterables._ import org.neo4j.helpers.collection.Iterables._


Expand Down Expand Up @@ -131,6 +130,23 @@ class CompiledExecutionResultTest extends CypherFunSuite {
intercept[IllegalStateException](result.javaIterator.hasNext) intercept[IllegalStateException](result.javaIterator.hasNext)
} }


test("close should work after result is consumed") {
// given
val result = newCompiledExecutionResult(javaMap("a" -> "1", "b" -> "2"))

// when
result.accept(new ResultVisitor[Exception] {
override def visit(row: ResultRow): Boolean = {
true
}
})

result.close()

// then
// call of close actually worked
}

private def newCompiledExecutionResult(row: util.Map[String, Any] = new util.HashMap(), private def newCompiledExecutionResult(row: util.Map[String, Any] = new util.HashMap(),
taskCloser: TaskCloser = new TaskCloser, taskCloser: TaskCloser = new TaskCloser,
assertion: () => Unit = () => {}) = { assertion: () => Unit = () => {}) = {
Expand Down
Expand Up @@ -19,11 +19,12 @@
*/ */
package org.neo4j.cypher.javacompat; package org.neo4j.cypher.javacompat;


import scala.collection.JavaConversions;

import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

import java.util.Objects;
import scala.collection.JavaConversions;


import org.neo4j.cypher.CypherException; import org.neo4j.cypher.CypherException;
import org.neo4j.graphdb.ExecutionPlanDescription; import org.neo4j.graphdb.ExecutionPlanDescription;
Expand Down Expand Up @@ -55,20 +56,25 @@
public class ExecutionResult implements ResourceIterable<Map<String,Object>>, Result public class ExecutionResult implements ResourceIterable<Map<String,Object>>, Result
{ {
private final org.neo4j.cypher.ExtendedExecutionResult inner; private final org.neo4j.cypher.ExtendedExecutionResult inner;
private ResourceIterator<Map<String, Object>> iter = null;
/**
* Initialized lazily and should be accessed with {@link #innerIterator()} method
* because {@link #accept(ResultVisitor)} does not require iterator.
*/
private ResourceIterator<Map<String,Object>> innerIterator;


/** /**
* Constructor used by the Cypher framework. End-users should not * Constructor used by the Cypher framework. End-users should not
* create an ExecutionResult directly, but instead use the result * create an ExecutionResult directly, but instead use the result
* returned from calling {@link ExecutionEngine#execute(String)}. * returned from calling {@link ExecutionEngine#execute(String)}.
*/ */
public ExecutionResult( org.neo4j.cypher.ExtendedExecutionResult projection) public ExecutionResult( org.neo4j.cypher.ExtendedExecutionResult projection )
{ {
inner = projection; inner = Objects.requireNonNull( projection );
//if updating query we must fetch the iterator right away in order to eagerly perform updates //if updating query we must fetch the iterator right away in order to eagerly perform updates
if ( projection.executionType().queryType() == QueryType.WRITE ) if ( projection.executionType().queryType() == QueryType.WRITE )
{ {
ensureIterator(); innerIterator();
} }
} }


Expand Down Expand Up @@ -201,17 +207,15 @@ public void toString( PrintWriter writer )
@Override @Override
public ResourceIterator<Map<String, Object>> iterator() public ResourceIterator<Map<String, Object>> iterator()
{ {
ensureIterator(); return innerIterator(); // legacy method - don't convert exceptions...
return iter; // legacy method - don't convert exceptions...
} }


@Override @Override
public boolean hasNext() public boolean hasNext()
{ {
ensureIterator();
try try
{ {
return iter.hasNext(); return innerIterator().hasNext();
} }
catch ( CypherException e ) catch ( CypherException e )
{ {
Expand All @@ -222,10 +226,9 @@ public boolean hasNext()
@Override @Override
public Map<String, Object> next() public Map<String, Object> next()
{ {
ensureIterator();
try try
{ {
return iter.next(); return innerIterator().next();
} }
catch ( CypherException e ) catch ( CypherException e )
{ {
Expand All @@ -236,25 +239,22 @@ public Map<String, Object> next()
@Override @Override
public void close() public void close()
{ {
ensureIterator();
try try
{ {
iter.close(); // inner iterator might be null if this result was consumed using visitor
if ( innerIterator != null )
{
innerIterator.close();
}
// but we still need to close the underlying exetended execution result
inner.close();
} }
catch ( CypherException e ) catch ( CypherException e )
{ {
throw converted( e ); throw converted( e );
} }
} }


private void ensureIterator()
{
if (iter == null)
{
iter = inner.javaIterator();
}
}

@Override @Override
public ExecutionPlanDescription getExecutionPlanDescription() public ExecutionPlanDescription getExecutionPlanDescription()
{ {
Expand Down Expand Up @@ -313,6 +313,15 @@ public Iterable<Notification> getNotifications()
return JavaConversions.asJavaIterable( inner.notifications() ); return JavaConversions.asJavaIterable( inner.notifications() );
} }


private ResourceIterator<Map<String,Object>> innerIterator()
{
if ( innerIterator == null )
{
innerIterator = inner.javaIterator();
}
return innerIterator;
}

private static class ExceptionConversion<T> implements ResourceIterator<T> private static class ExceptionConversion<T> implements ResourceIterator<T>
{ {
private final ResourceIterator<T> inner; private final ResourceIterator<T> inner;
Expand Down
Expand Up @@ -20,33 +20,37 @@
package org.neo4j.cypher.javacompat; package org.neo4j.cypher.javacompat;


import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.neo4j.cypher.ArithmeticException; import org.neo4j.cypher.ArithmeticException;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.GraphDatabaseAPI; import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.ImpermanentDatabaseRule;

import java.util.Map;


import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsNull.notNullValue; import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue; import static org.hamcrest.core.IsNull.nullValue;


public class ExecutionResultTest public class ExecutionResultTest
{ {
private GraphDatabaseAPI db; @Rule
public final ImpermanentDatabaseRule db = new ImpermanentDatabaseRule();
private ExecutionEngine engine; private ExecutionEngine engine;


@Before @Before
public void setUp() throws Exception public void initializeExecutionEngine() throws Exception
{ {
db = (GraphDatabaseAPI) new TestGraphDatabaseFactory().newImpermanentDatabaseBuilder()
.newGraphDatabase();
engine = new ExecutionEngine( db ); engine = new ExecutionEngine( db );
} }


Expand All @@ -58,7 +62,7 @@ public void shouldCloseTransactionsWhenIteratingResults() throws Exception
createNode(); createNode();
createNode(); createNode();
ExecutionResult executionResult = engine.execute( "CYPHER runtime=interpreted MATCH (n) RETURN n" ); ExecutionResult executionResult = engine.execute( "CYPHER runtime=interpreted MATCH (n) RETURN n" );
ResourceIterator<Map<String, Object>> resultIterator = executionResult.iterator(); ResourceIterator<Map<String,Object>> resultIterator = executionResult.iterator();
resultIterator.next(); resultIterator.next();
assertThat( activeTransaction(), is( notNullValue() ) ); assertThat( activeTransaction(), is( notNullValue() ) );


Expand Down Expand Up @@ -88,13 +92,13 @@ public void shouldCloseTransactionsWhenIteratingOverSingleColumn() throws Except
assertThat( activeTransaction(), is( nullValue() ) ); assertThat( activeTransaction(), is( nullValue() ) );
} }


@Test(expected = ArithmeticException.class) @Test( expected = ArithmeticException.class )
public void shouldThrowAppropriateException() throws Exception public void shouldThrowAppropriateException() throws Exception
{ {
engine.execute( "RETURN rand()/0" ).iterator().next(); engine.execute( "RETURN rand()/0" ).iterator().next();
} }


@Test(expected = ArithmeticException.class) @Test( expected = ArithmeticException.class )
public void shouldThrowAppropriateExceptionAlsoWhenVisiting() throws Exception public void shouldThrowAppropriateExceptionAlsoWhenVisiting() throws Exception
{ {
engine.execute( "RETURN rand()/0" ).accept( new Result.ResultVisitor() engine.execute( "RETURN rand()/0" ).accept( new Result.ResultVisitor()
Expand All @@ -107,6 +111,112 @@ public boolean visit( Result.ResultRow row )
} ); } );
} }


@Test
public void shouldBePossibleToConsumeCompiledExecutionResultsWithIterator()
{
// Given
createNode();
createNode();

// When
List<Map<String,Object>> listResult;
try ( Result result = db.execute( "CYPHER runtime=compiled MATCH (n) RETURN n" ) )
{
listResult = Iterables.toList( result );
}

// Then
assertThat( listResult, hasSize( 2 ) );
}

@Test
public void shouldBePossibleToCloseNotFullyConsumedCompiledExecutionResults()
{
// Given
createNode();
createNode();

// When
Map<String,Object> firstRow = null;
try ( Result result = db.execute( "CYPHER runtime=compiled MATCH (n) RETURN n" ) )
{
if ( result.hasNext() )
{
firstRow = result.next();
}
}

// Then
assertThat( firstRow, notNullValue() );
}

@Test
public void shouldBePossibleToConsumeCompiledExecutionResultsWithVisitor()
{
// Given
createNode();
createNode();

// When
final List<Result.ResultRow> listResult = new ArrayList<>();
try ( Result result = db.execute( "CYPHER runtime=compiled MATCH (n) RETURN n" ) )
{
result.accept( new Result.ResultVisitor<RuntimeException>()
{
@Override
public boolean visit( Result.ResultRow row ) throws RuntimeException
{
listResult.add( row );
return true;
}
} );
}

// Then
assertThat( listResult, hasSize( 2 ) );
}

@Test
public void shouldBePossibleToCloseNotFullyVisitedCompiledExecutionResult()
{
// Given
createNode();
createNode();

// When
final List<Result.ResultRow> listResult = new ArrayList<>();
try ( Result result = db.execute( "CYPHER runtime=compiled MATCH (n) RETURN n" ) )
{
result.accept( new Result.ResultVisitor<RuntimeException>()
{
@Override
public boolean visit( Result.ResultRow row ) throws RuntimeException
{
listResult.add( row );
// return false so that no more result rows would be visited
return false;
}
} );
}

// Then
assertThat( listResult, hasSize( 1 ) );
}

@Test
public void shouldBePossibleToCloseNotConsumedCompiledExecutionResult()
{
// Given
createNode();

// When
try ( Result ignore = db.execute( "CYPHER runtime=compiled MATCH (n) RETURN n" ) )
{
// Then
// just close result without consuming it
}
}

private void createNode() private void createNode()
{ {
try ( Transaction tx = db.beginTx() ) try ( Transaction tx = db.beginTx() )
Expand Down
Expand Up @@ -22,6 +22,7 @@ package org.neo4j.cypher
import org.neo4j.cypher.internal.compiler.v2_3.CostBasedPlannerName import org.neo4j.cypher.internal.compiler.v2_3.CostBasedPlannerName
import org.neo4j.cypher.internal.compiler.v2_3.test_helpers.CypherFunSuite import org.neo4j.cypher.internal.compiler.v2_3.test_helpers.CypherFunSuite
import org.neo4j.graphdb.GraphDatabaseService import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.graphdb.Result.{ResultRow, ResultVisitor}
import org.neo4j.graphdb.factory.GraphDatabaseSettings import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.GraphDatabaseAPI import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
Expand Down Expand Up @@ -205,6 +206,22 @@ class ExecutionEngineIT extends CypherFunSuite {
txBridge(db).hasTransaction shouldBe false txBridge(db).hasTransaction shouldBe false
} }


test("should be possible to close compiled result after it is consumed") {
// given
val db = new TestGraphDatabaseFactory().newImpermanentDatabase()

// when
val result = db.execute("CYPHER runtime=compiled MATCH (n) RETURN n")
result.accept(new ResultVisitor[RuntimeException] {
def visit(row: ResultRow) = true
})

result.close()

// then
// call to close actually worked
}

private implicit class RichDb(db: GraphDatabaseService) { private implicit class RichDb(db: GraphDatabaseService) {
def planDescriptionForQuery(query: String) = { def planDescriptionForQuery(query: String) = {
val res = db.execute(query) val res = db.execute(query)
Expand All @@ -213,7 +230,6 @@ class ExecutionEngineIT extends CypherFunSuite {
} }
} }



private def txBridge(db: GraphDatabaseService) = { private def txBridge(db: GraphDatabaseService) = {
db.asInstanceOf[GraphDatabaseAPI].getDependencyResolver.resolveDependency(classOf[ThreadToStatementContextBridge]) db.asInstanceOf[GraphDatabaseAPI].getDependencyResolver.resolveDependency(classOf[ThreadToStatementContextBridge])
} }
Expand Down

0 comments on commit 713dcc7

Please sign in to comment.