Skip to content

Commit

Permalink
Close procedure streams on failures
Browse files Browse the repository at this point in the history
Make sure we call any close handlers attached to procedure
streams on exceptions. Exceptions that occur during close are
collected as ResourceCloseFailureException and attached as suppressed.

Introduces a ResourceManager/ResourceTracker on KernelStatement to
keep track of active resources so that any non-exhausted procedure
streams are always closed if the transaction is closed during query
execution without closing the query result first.

The ResourceManager implementation is separated into a
class CloseableResourceManager so we can easily move it if we
will not keep KernelStatement in the future.

Also improves the error message.
  • Loading branch information
henriknyman committed Feb 16, 2018
1 parent 3f2c162 commit 8554f4e
Show file tree
Hide file tree
Showing 41 changed files with 850 additions and 90 deletions.
@@ -0,0 +1,174 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.cypher.acceptance;

import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.impl.proc.ComponentRegistry;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.UserFunction;

public class TestResourceProcedure
{
public static abstract class SimulateFailureBaseException extends RuntimeException
{
}

public static class SimulateFailureException extends SimulateFailureBaseException
{
}

public static class SimulateFailureOnCloseException extends SimulateFailureBaseException
{
}

@Context
public GraphDatabaseService db;

@Context
public Counters counters;

public static class Counters
{
public int closeCountTestResourceProcedure = 0;
public int closeCountTestFailingResourceProcedure = 0;
public int closeCountTestOnCloseFailingResourceProcedure = 0;

public int openCountTestResourceProcedure = 0;
public int openCountTestFailingResourceProcedure = 0;
public int openCountTestOnCloseFailingResourceProcedure = 0;

public int liveCountTestResourceProcedure()
{
return openCountTestResourceProcedure - closeCountTestResourceProcedure;
}

public int liveCountTestFailingResourceProcedure()
{
return openCountTestFailingResourceProcedure - closeCountTestFailingResourceProcedure;
}

public int liveCountTestOnCloseFailingResourceProcedure()
{
return openCountTestOnCloseFailingResourceProcedure - closeCountTestOnCloseFailingResourceProcedure;
}

public void reset()
{
closeCountTestResourceProcedure = 0;
closeCountTestFailingResourceProcedure = 0;
closeCountTestOnCloseFailingResourceProcedure = 0;
openCountTestResourceProcedure = 0;
openCountTestFailingResourceProcedure = 0;
openCountTestOnCloseFailingResourceProcedure = 0;
}
}

public static ComponentRegistry.Provider<Counters> countersProvider(Counters counters1)
{
return context -> counters1;
}

public class Output
{
public Long value;

public Output( Long value )
{
this.value = value;
}
}

@Procedure( name = "org.neo4j.test.testResourceProcedure", mode = Mode.READ )
@Description( "Returns a stream of integers from 1 to the given argument" )
public Stream<Output> testResourceProcedure( @Name( value = "resultCount", defaultValue = "4" ) long resultCount ) throws Exception
{
Stream<Output> stream = Stream.iterate( 1L, (i) -> i + 1 ).limit( resultCount ).map( Output::new );
stream.onClose( () ->
{
counters.closeCountTestResourceProcedure++;
} );
counters.openCountTestResourceProcedure++;
return stream;
}

@Procedure( name = "org.neo4j.test.testFailingResourceProcedure", mode = Mode.READ )
@Description( "Returns a stream of integers from 1 to the given argument, but throws an exception when reaching the last element" )
public Stream<Output> testFailingResourceProcedure( @Name( value = "failCount", defaultValue = "3" ) long failCount ) throws Exception
{
Iterator<Output> failingIterator = new Iterator<Output>()
{
private long step = 1;

@Override
public boolean hasNext()
{
return step <= failCount;
}

@Override
public Output next()
{
if ( step == failCount )
{
throw new SimulateFailureException();
}
step++;
return new Output(step);
}
};
Iterable<Output> failingIterable = () -> failingIterator;
Stream<Output> stream = StreamSupport.stream(failingIterable.spliterator(), false);
stream.onClose( () ->
{
counters.closeCountTestFailingResourceProcedure++;
} );
counters.openCountTestFailingResourceProcedure++;
return stream;
}

@Procedure( name = "org.neo4j.test.testOnCloseFailingResourceProcedure", mode = Mode.READ )
@Description( "Returns a stream of integers from 1 to the given argument. Throws an exception on close." )
public Stream<Output> testOnCloseFailingResourceProcedure( @Name( value = "resultCount", defaultValue = "4" ) long resultCount ) throws Exception
{
Stream<Output> stream = Stream.iterate( 1L, (i) -> i + 1 ).limit( resultCount ).map( Output::new );
stream.onClose( () ->
{
counters.closeCountTestOnCloseFailingResourceProcedure++;
throw new SimulateFailureOnCloseException();
} );
counters.openCountTestOnCloseFailingResourceProcedure++;
return stream;
}

@UserFunction( name = "org.neo4j.test.fail" )
@Description( "org.neo4j.test.fail" )
public String fail( @Name( value = "input" ) String input )
{
throw new SimulateFailureException();
}
}
Expand Up @@ -26,7 +26,7 @@ import org.neo4j.cypher.internal.compiler.v3_1.test_helpers.CreateTempFileTestSu
import org.neo4j.cypher.{ExecutionEngineFunSuite, NewPlannerTestSupport, QueryStatisticsTestSupport} import org.neo4j.cypher.{ExecutionEngineFunSuite, NewPlannerTestSupport, QueryStatisticsTestSupport}
import org.neo4j.graphdb.Node import org.neo4j.graphdb.Node
import org.neo4j.kernel.api.exceptions.ProcedureException import org.neo4j.kernel.api.exceptions.ProcedureException
import org.neo4j.kernel.api.proc import org.neo4j.kernel.api.{ResourceTracker, proc}
import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure
import org.neo4j.kernel.api.proc.{Context, Neo4jTypes} import org.neo4j.kernel.api.proc.{Context, Neo4jTypes}
import org.neo4j.procedure.Mode import org.neo4j.procedure.Mode
Expand Down Expand Up @@ -180,7 +180,8 @@ class EagerizationAcceptanceTest
builder.out("relId", Neo4jTypes.NTInteger) builder.out("relId", Neo4jTypes.NTInteger)
builder.mode(Mode.WRITE) builder.mode(Mode.WRITE)
new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = {
val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION)
val statement = transaction.acquireStatement() val statement = transaction.acquireStatement()
try { try {
Expand Down Expand Up @@ -217,7 +218,8 @@ class EagerizationAcceptanceTest
builder.out(org.neo4j.kernel.api.proc.ProcedureSignature.VOID) builder.out(org.neo4j.kernel.api.proc.ProcedureSignature.VOID)
builder.mode(Mode.WRITE) builder.mode(Mode.WRITE)
new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = {
val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION)
val statement = transaction.acquireStatement() val statement = transaction.acquireStatement()
try { try {
Expand Down Expand Up @@ -253,7 +255,8 @@ class EagerizationAcceptanceTest
builder.in("y", Neo4jTypes.NTNode) builder.in("y", Neo4jTypes.NTNode)
builder.out("relId", Neo4jTypes.NTInteger) builder.out("relId", Neo4jTypes.NTInteger)
new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = {
val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION)
val statement = transaction.acquireStatement() val statement = transaction.acquireStatement()
try { try {
Expand Down Expand Up @@ -301,7 +304,8 @@ class EagerizationAcceptanceTest
builder.in("y", Neo4jTypes.NTNode) builder.in("y", Neo4jTypes.NTNode)
builder.out(org.neo4j.kernel.api.proc.ProcedureSignature.VOID) builder.out(org.neo4j.kernel.api.proc.ProcedureSignature.VOID)
new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = {
val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION)
val statement = transaction.acquireStatement() val statement = transaction.acquireStatement()
try { try {
Expand Down
Expand Up @@ -21,6 +21,7 @@ package org.neo4j.internal.cypher.acceptance


import org.neo4j.collection.RawIterator import org.neo4j.collection.RawIterator
import org.neo4j.cypher._ import org.neo4j.cypher._
import org.neo4j.kernel.api.ResourceTracker
import org.neo4j.kernel.api.exceptions.ProcedureException import org.neo4j.kernel.api.exceptions.ProcedureException
import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure
import org.neo4j.kernel.api.proc.CallableUserFunction.BasicUserFunction import org.neo4j.kernel.api.proc.CallableUserFunction.BasicUserFunction
Expand All @@ -40,7 +41,8 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite {
} }


new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] =
RawIterator.of[Array[AnyRef], ProcedureException](input) RawIterator.of[Array[AnyRef], ProcedureException](input)
} }
} }
Expand All @@ -51,7 +53,8 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite {
builder.out("out", Neo4jTypes.NTAny) builder.out("out", Neo4jTypes.NTAny)


new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] =
RawIterator.of[Array[AnyRef], ProcedureException](Array(value)) RawIterator.of[Array[AnyRef], ProcedureException](Array(value))
} }
} }
Expand All @@ -71,15 +74,17 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite {
builder.out(ProcedureSignature.VOID) builder.out(ProcedureSignature.VOID)


new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] =
RawIterator.empty() RawIterator.empty()
} }
} }


protected def registerProcedureReturningNoRowsOrColumns() = protected def registerProcedureReturningNoRowsOrColumns() =
registerProcedure("dbms.return_nothing") { builder => registerProcedure("dbms.return_nothing") { builder =>
new BasicProcedure(builder.build) { new BasicProcedure(builder.build) {
override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = override def apply(ctx: Context, input: Array[AnyRef],
resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] =
RawIterator.empty() RawIterator.empty()
} }
} }
Expand Down

0 comments on commit 8554f4e

Please sign in to comment.