diff --git a/community/cypher/acceptance-spec-suite/src/test/java/org/neo4j/internal/cypher/acceptance/TestResourceProcedure.java b/community/cypher/acceptance-spec-suite/src/test/java/org/neo4j/internal/cypher/acceptance/TestResourceProcedure.java new file mode 100644 index 0000000000000..bee468d25b2df --- /dev/null +++ b/community/cypher/acceptance-spec-suite/src/test/java/org/neo4j/internal/cypher/acceptance/TestResourceProcedure.java @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.internal.cypher.acceptance; + +import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.kernel.impl.proc.ComponentRegistry; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.UserFunction; + +public class TestResourceProcedure +{ + public static abstract class SimulateFailureBaseException extends RuntimeException + { + } + + public static class SimulateFailureException extends SimulateFailureBaseException + { + } + + public static class SimulateFailureOnCloseException extends SimulateFailureBaseException + { + } + + @Context + public GraphDatabaseService db; + + @Context + public Counters counters; + + public static class Counters + { + public int closeCountTestResourceProcedure = 0; + public int closeCountTestFailingResourceProcedure = 0; + public int closeCountTestOnCloseFailingResourceProcedure = 0; + + public int openCountTestResourceProcedure = 0; + public int openCountTestFailingResourceProcedure = 0; + public int openCountTestOnCloseFailingResourceProcedure = 0; + + public int liveCountTestResourceProcedure() + { + return openCountTestResourceProcedure - closeCountTestResourceProcedure; + } + + public int liveCountTestFailingResourceProcedure() + { + return openCountTestFailingResourceProcedure - closeCountTestFailingResourceProcedure; + } + + public int liveCountTestOnCloseFailingResourceProcedure() + { + return openCountTestOnCloseFailingResourceProcedure - closeCountTestOnCloseFailingResourceProcedure; + } + + public void reset() + { + closeCountTestResourceProcedure = 0; + closeCountTestFailingResourceProcedure = 0; + closeCountTestOnCloseFailingResourceProcedure = 0; + openCountTestResourceProcedure = 0; + openCountTestFailingResourceProcedure = 0; + openCountTestOnCloseFailingResourceProcedure = 0; + } + } + + public static ComponentRegistry.Provider countersProvider(Counters counters1) + { + return context -> counters1; + } + + public class Output + { + public Long value; + + public Output( Long value ) + { + this.value = value; + } + } + + @Procedure( name = "org.neo4j.test.testResourceProcedure", mode = Mode.READ ) + @Description( "Returns a stream of integers from 1 to the given argument" ) + public Stream testResourceProcedure( @Name( value = "resultCount", defaultValue = "4" ) long resultCount ) throws Exception + { + Stream stream = Stream.iterate( 1L, (i) -> i + 1 ).limit( resultCount ).map( Output::new ); + stream.onClose( () -> + { + counters.closeCountTestResourceProcedure++; + } ); + counters.openCountTestResourceProcedure++; + return stream; + } + + @Procedure( name = "org.neo4j.test.testFailingResourceProcedure", mode = Mode.READ ) + @Description( "Returns a stream of integers from 1 to the given argument, but throws an exception when reaching the last element" ) + public Stream testFailingResourceProcedure( @Name( value = "failCount", defaultValue = "3" ) long failCount ) throws Exception + { + Iterator failingIterator = new Iterator() + { + private long step = 1; + + @Override + public boolean hasNext() + { + return step <= failCount; + } + + @Override + public Output next() + { + if ( step == failCount ) + { + throw new SimulateFailureException(); + } + step++; + return new Output(step); + } + }; + Iterable failingIterable = () -> failingIterator; + Stream stream = StreamSupport.stream(failingIterable.spliterator(), false); + stream.onClose( () -> + { + counters.closeCountTestFailingResourceProcedure++; + } ); + counters.openCountTestFailingResourceProcedure++; + return stream; + } + + @Procedure( name = "org.neo4j.test.testOnCloseFailingResourceProcedure", mode = Mode.READ ) + @Description( "Returns a stream of integers from 1 to the given argument. Throws an exception on close." ) + public Stream testOnCloseFailingResourceProcedure( @Name( value = "resultCount", defaultValue = "4" ) long resultCount ) throws Exception + { + Stream stream = Stream.iterate( 1L, (i) -> i + 1 ).limit( resultCount ).map( Output::new ); + stream.onClose( () -> + { + counters.closeCountTestOnCloseFailingResourceProcedure++; + throw new SimulateFailureOnCloseException(); + } ); + counters.openCountTestOnCloseFailingResourceProcedure++; + return stream; + } + + @UserFunction( name = "org.neo4j.test.fail" ) + @Description( "org.neo4j.test.fail" ) + public String fail( @Name( value = "input" ) String input ) + { + throw new SimulateFailureException(); + } +} diff --git a/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/EagerizationAcceptanceTest.scala b/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/EagerizationAcceptanceTest.scala index e7d25d0f96efe..9f8af54a46f98 100644 --- a/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/EagerizationAcceptanceTest.scala +++ b/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/EagerizationAcceptanceTest.scala @@ -26,7 +26,7 @@ import org.neo4j.cypher.internal.compiler.v3_1.test_helpers.CreateTempFileTestSu import org.neo4j.cypher.{ExecutionEngineFunSuite, NewPlannerTestSupport, QueryStatisticsTestSupport} import org.neo4j.graphdb.Node import org.neo4j.kernel.api.exceptions.ProcedureException -import org.neo4j.kernel.api.proc +import org.neo4j.kernel.api.{ResourceTracker, proc} import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure import org.neo4j.kernel.api.proc.{Context, Neo4jTypes} import org.neo4j.procedure.Mode @@ -180,7 +180,8 @@ class EagerizationAcceptanceTest builder.out("relId", Neo4jTypes.NTInteger) builder.mode(Mode.WRITE) new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = { val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val statement = transaction.acquireStatement() try { @@ -217,7 +218,8 @@ class EagerizationAcceptanceTest builder.out(org.neo4j.kernel.api.proc.ProcedureSignature.VOID) builder.mode(Mode.WRITE) new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = { val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val statement = transaction.acquireStatement() try { @@ -253,7 +255,8 @@ class EagerizationAcceptanceTest builder.in("y", Neo4jTypes.NTNode) builder.out("relId", Neo4jTypes.NTInteger) new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = { val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val statement = transaction.acquireStatement() try { @@ -301,7 +304,8 @@ class EagerizationAcceptanceTest builder.in("y", Neo4jTypes.NTNode) builder.out(org.neo4j.kernel.api.proc.ProcedureSignature.VOID) new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = { val transaction = ctx.get(proc.Context.KERNEL_TRANSACTION) val statement = transaction.acquireStatement() try { diff --git a/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ProcedureCallAcceptanceTest.scala b/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ProcedureCallAcceptanceTest.scala index cac493f6a67a8..117eef7273750 100644 --- a/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ProcedureCallAcceptanceTest.scala +++ b/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ProcedureCallAcceptanceTest.scala @@ -21,6 +21,7 @@ package org.neo4j.internal.cypher.acceptance import org.neo4j.collection.RawIterator import org.neo4j.cypher._ +import org.neo4j.kernel.api.ResourceTracker import org.neo4j.kernel.api.exceptions.ProcedureException import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure import org.neo4j.kernel.api.proc.CallableUserFunction.BasicUserFunction @@ -40,7 +41,8 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite { } new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = RawIterator.of[Array[AnyRef], ProcedureException](input) } } @@ -51,7 +53,8 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite { builder.out("out", Neo4jTypes.NTAny) new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = RawIterator.of[Array[AnyRef], ProcedureException](Array(value)) } } @@ -71,7 +74,8 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite { builder.out(ProcedureSignature.VOID) new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = RawIterator.empty() } } @@ -79,7 +83,8 @@ abstract class ProcedureCallAcceptanceTest extends ExecutionEngineFunSuite { protected def registerProcedureReturningNoRowsOrColumns() = registerProcedure("dbms.return_nothing") { builder => new BasicProcedure(builder.build) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = RawIterator.empty() } } diff --git a/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ReflectiveProcedureCallAcceptanceTest.scala b/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ReflectiveProcedureCallAcceptanceTest.scala new file mode 100644 index 0000000000000..a778c3c851faf --- /dev/null +++ b/community/cypher/acceptance-spec-suite/src/test/scala/org/neo4j/internal/cypher/acceptance/ReflectiveProcedureCallAcceptanceTest.scala @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.internal.cypher.acceptance + +import java.io.PrintWriter + +import org.apache.commons.lang3.exception.ExceptionUtils +import org.neo4j.cypher._ +import org.neo4j.cypher.internal.compiler.v3_1.test_helpers.CreateTempFileTestSupport +import org.neo4j.graphdb.{QueryExecutionException, TransactionFailureException} +import org.neo4j.internal.cypher.acceptance.TestResourceProcedure.{SimulateFailureException, SimulateFailureOnCloseException} +import org.neo4j.kernel.api.KernelTransaction +import org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED +import org.neo4j.kernel.impl.proc.Procedures + +import scala.collection.mutable.ArrayBuffer + +class ReflectiveProcedureCallAcceptanceTest extends ExecutionEngineFunSuite with CreateTempFileTestSupport { + + def query(resultCount: Long, failCount: Long) = + s""" + |CALL org.neo4j.test.testOnCloseFailingResourceProcedure($resultCount) YIELD value as v1 + |WITH v1 + |CALL org.neo4j.test.testResourceProcedure($resultCount) YIELD value as v2 + |WITH v1, v2 + |CALL org.neo4j.test.testOnCloseFailingResourceProcedure($resultCount) YIELD value as v3 + |WITH v1, v2, v3 + |CALL org.neo4j.test.testFailingResourceProcedure($failCount) YIELD value as v4 + |WITH v1, v2, v3, v4 + |CALL org.neo4j.test.testResourceProcedure($resultCount) YIELD value as v5 + |RETURN v1, v2, v3, v4, v5 + """.stripMargin + + val defaultQuery = query(resultCount = 4, failCount = 3) + + private def setUpProcedures(): TestResourceProcedure.Counters = { + val procedures = graph.getDependencyResolver.resolveDependency(classOf[Procedures]) + val counters = new TestResourceProcedure.Counters + procedures.registerComponent(classOf[TestResourceProcedure.Counters], TestResourceProcedure.countersProvider(counters)) + + procedures.registerProcedure(classOf[TestResourceProcedure]) + procedures.registerFunction(classOf[TestResourceProcedure]) + + counters + } + + test("should close resources on failure") { + val counters = setUpProcedures() + + val caught = intercept[QueryExecutionException] { + val result = graph.execute(defaultQuery) + result.resultAsString() + } + val rootCause = ExceptionUtils.getRootCause(caught) + rootCause shouldBe a[SimulateFailureException] + + val suppressed = collectSuppressed(caught) + suppressed.head shouldBe a[CypherExecutionException] + ExceptionUtils.getRootCause(suppressed.head) shouldBe a[SimulateFailureOnCloseException] + + counters.liveCountTestResourceProcedure() shouldEqual 0 + counters.liveCountTestFailingResourceProcedure() shouldEqual 0 + counters.liveCountTestOnCloseFailingResourceProcedure() shouldEqual 0 + + counters.closeCountTestResourceProcedure shouldEqual 3 // 1 close from exhausting into v5 + 2 closes on failure + counters.closeCountTestFailingResourceProcedure shouldEqual 1 + counters.closeCountTestOnCloseFailingResourceProcedure shouldEqual 2 + } + + test("should close resources on mid-stream transaction close") { + val counters = setUpProcedures() + + val tx = graph.beginTransaction(KernelTransaction.Type.`implicit`, AUTH_DISABLED) + val result = graph.execute(defaultQuery) + + // Pull one row and then close the transaction + result.next() + + // Close transaction while the result is still open + val caught = intercept[TransactionFailureException] { + tx.close() + } + val rootCause = ExceptionUtils.getRootCause(caught) + rootCause shouldBe a[SimulateFailureOnCloseException] + + val suppressed = collectSuppressed(caught) + suppressed.head shouldBe a[SimulateFailureOnCloseException] + + counters.liveCountTestResourceProcedure() shouldEqual 0 + counters.liveCountTestFailingResourceProcedure() shouldEqual 0 + counters.liveCountTestOnCloseFailingResourceProcedure() shouldEqual 0 + + counters.closeCountTestResourceProcedure shouldEqual 2 + counters.closeCountTestFailingResourceProcedure shouldEqual 1 + counters.closeCountTestOnCloseFailingResourceProcedure shouldEqual 2 + } + + test("should not leave any resources open on transaction close before pulling on the result") { + val counters = setUpProcedures() + + val tx = graph.beginTransaction(KernelTransaction.Type.`implicit`, AUTH_DISABLED) + val result = graph.execute(defaultQuery) + + // Close the transaction directly without pulling the result + tx.close() + + counters.liveCountTestResourceProcedure() shouldEqual 0 + counters.liveCountTestFailingResourceProcedure() shouldEqual 0 + counters.liveCountTestOnCloseFailingResourceProcedure() shouldEqual 0 + } + + test("should handle tracking many closeable resources without stockpiling them until the end of the transaction") { + val counters = setUpProcedures() + val numberOfRows = 100 + + val tx = graph.beginTransaction(KernelTransaction.Type.`implicit`, AUTH_DISABLED) + val result = graph.execute(s"UNWIND range(1,$numberOfRows) as i CALL org.neo4j.test.testResourceProcedure(1) YIELD value RETURN value") + + // Pull one row and then close the transaction + var i = 0 + while (i < numberOfRows - 1 && result.hasNext) { + result.next() + i += 1 + } + val counterBeforeTxClose = counters.closeCountTestResourceProcedure + tx.close() + val counterAfterTxClose = counters.closeCountTestResourceProcedure + + counters.liveCountTestResourceProcedure() shouldEqual 0 + counters.closeCountTestResourceProcedure shouldEqual numberOfRows + (counterAfterTxClose - counterBeforeTxClose) shouldEqual 1 // Only the last open stream should need to be closed at transaction closure + } + + test("should close resources on failure with periodic commit") { + val counters = setUpProcedures() + + val url = createTempCSVFile(10) + val periodicCommitQuery = + s"""USING PERIODIC COMMIT 3 + |LOAD CSV FROM '$url' AS line + |CREATE () + |WITH line + |CALL org.neo4j.test.testResourceProcedure(4) YIELD value as v1 + |WITH line, v1, + | CASE line + | WHEN ['8'] THEN org.neo4j.test.fail("8") + | ELSE 'ok' + | END as ok + |RETURN line, v1, ok + |""".stripMargin + + val caught = intercept[QueryExecutionException] { + val result = graph.execute(periodicCommitQuery) + result.resultAsString() + } + val rootCause = ExceptionUtils.getRootCause(caught) + rootCause shouldBe a[SimulateFailureException] + + counters.liveCountTestResourceProcedure() shouldEqual 0 + counters.liveCountTestFailingResourceProcedure() shouldEqual 0 + counters.liveCountTestOnCloseFailingResourceProcedure() shouldEqual 0 + + counters.closeCountTestResourceProcedure shouldEqual 8 + counters.closeCountTestFailingResourceProcedure shouldEqual 0 // Unused + counters.closeCountTestOnCloseFailingResourceProcedure shouldEqual 0 // Unused + } + + private def collectSuppressed(t: Throwable): Seq[Throwable] = { + val suppressed = ArrayBuffer[Throwable](t.getSuppressed:_*) + var cause = t.getCause + while (cause != null) { + suppressed ++= cause.getSuppressed + val nextCause = cause.getCause + cause = if (nextCause == null || cause == nextCause) null else nextCause + } + suppressed + } + + def createTempCSVFile(numberOfLines: Int): String = + createTempFileURL("file", ".csv") { writer: PrintWriter => + 1.to(numberOfLines).foreach { n: Int => writer.println(n.toString) } + } + +} diff --git a/community/cypher/cypher-compiler-3.1/src/main/scala/org/neo4j/cypher/internal/compiler/v3_1/ResultIterator.scala b/community/cypher/cypher-compiler-3.1/src/main/scala/org/neo4j/cypher/internal/compiler/v3_1/ResultIterator.scala index eab9b4e72ef32..59713dde34257 100644 --- a/community/cypher/cypher-compiler-3.1/src/main/scala/org/neo4j/cypher/internal/compiler/v3_1/ResultIterator.scala +++ b/community/cypher/cypher-compiler-3.1/src/main/scala/org/neo4j/cypher/internal/compiler/v3_1/ResultIterator.scala @@ -94,7 +94,16 @@ class ClosingIterator(inner: Iterator[collection.Map[String, Any]], f } catch { case t: Throwable => - close(success = false) + try { + close(success = false) + } catch { + case thrownDuringClose: Throwable => + try { + t.addSuppressed(thrownDuringClose) + } catch { + case _: Throwable => // Ignore + } + } throw t } }) diff --git a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_0.scala b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_0.scala index 6171b50edfd76..f2a47f3962aa5 100644 --- a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_0.scala +++ b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_0.scala @@ -26,7 +26,7 @@ import org.neo4j.kernel.api.KernelTransaction.Revertable import org.neo4j.kernel.api.dbms.DbmsOperations import org.neo4j.kernel.api.security.{AccessMode, SecurityContext} import org.neo4j.kernel.api.txstate.TxStateHolder -import org.neo4j.kernel.api.{ReadOperations, Statement} +import org.neo4j.kernel.api.{ReadOperations, ResourceTracker, Statement} import org.neo4j.kernel.impl.query.TransactionalContext case class TransactionalContextWrapperv3_0(tc: TransactionalContext) extends QueryTransactionalContext { @@ -63,4 +63,6 @@ case class TransactionalContextWrapperv3_0(tc: TransactionalContext) extends Que def restrictCurrentTransaction(context: SecurityContext): Revertable = tc.restrictCurrentTransaction(context) def securityContext: SecurityContext = tc.securityContext + + def resourceTracker: ResourceTracker[_<:AutoCloseable] = tc.resourceTracker } diff --git a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_1.scala b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_1.scala index d46490db6c65f..5158044d72f05 100644 --- a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_1.scala +++ b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/TransactionalContextWrapperv3_1.scala @@ -26,7 +26,7 @@ import org.neo4j.kernel.api.KernelTransaction.Revertable import org.neo4j.kernel.api.dbms.DbmsOperations import org.neo4j.kernel.api.security.{AccessMode, SecurityContext} import org.neo4j.kernel.api.txstate.TxStateHolder -import org.neo4j.kernel.api.{ReadOperations, Statement} +import org.neo4j.kernel.api.{ReadOperations, ResourceTracker, Statement} import org.neo4j.kernel.impl.query.TransactionalContext case class TransactionalContextWrapperv3_1(tc: TransactionalContext) extends QueryTransactionalContext { @@ -63,4 +63,6 @@ case class TransactionalContextWrapperv3_1(tc: TransactionalContext) extends Que def restrictCurrentTransaction(context: SecurityContext): Revertable = tc.restrictCurrentTransaction(context) def securityContext: SecurityContext = tc.securityContext + + def resourceTracker: ResourceTracker[_<:AutoCloseable] = tc.resourceTracker } diff --git a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/ExceptionTranslationSupport.scala b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/ExceptionTranslationSupport.scala index 09ce1006a5cb1..ef01e13fab6e3 100644 --- a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/ExceptionTranslationSupport.scala +++ b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/ExceptionTranslationSupport.scala @@ -23,7 +23,7 @@ import org.neo4j.cypher.internal.compiler.v3_0.spi.TokenContext import org.neo4j.cypher.{ConstraintValidationException, CypherExecutionException} import org.neo4j.graphdb.{ConstraintViolationException => KernelConstraintViolationException} import org.neo4j.kernel.api.TokenNameLookup -import org.neo4j.kernel.api.exceptions.KernelException +import org.neo4j.kernel.api.exceptions.{KernelException, ResourceCloseFailureException} trait ExceptionTranslationSupport { inner: TokenContext => @@ -39,6 +39,7 @@ trait ExceptionTranslationSupport { def relationshipTypeGetName(relTypeId: Int): String = inner.getRelTypeName(relTypeId) }), e) case e : KernelConstraintViolationException => throw new ConstraintValidationException(e.getMessage, e) + case e : ResourceCloseFailureException => throw new CypherExecutionException(e.getMessage, e) } protected def translateIterator[A](iteratorFactory: => Iterator[A]): Iterator[A] = { diff --git a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/TransactionBoundQueryContext.scala b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/TransactionBoundQueryContext.scala index af69d83567664..6a5086dcf93ba 100644 --- a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/TransactionBoundQueryContext.scala +++ b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_0/TransactionBoundQueryContext.scala @@ -625,7 +625,9 @@ final class TransactionBoundQueryContext(val transactionalContext: Transactional callProcedure(name, args, transactionalContext.statement.procedureCallOperations().procedureCallWrite) override def callDbmsProcedure(name: QualifiedProcedureName, args: Seq[Any]) = - callProcedure(name, args, transactionalContext.dbmsOperations.procedureCallDbms(_,_,transactionalContext.securityContext)) + callProcedure(name, args, + transactionalContext.dbmsOperations.procedureCallDbms(_,_,transactionalContext.securityContext, + transactionalContext.resourceTracker)) private def callProcedure(name: QualifiedProcedureName, args: Seq[Any], call: (QualifiedName, Array[AnyRef]) => RawIterator[Array[AnyRef], ProcedureException]) = { diff --git a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/ExceptionTranslationSupport.scala b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/ExceptionTranslationSupport.scala index b3e98fb18a59e..9f1f29161d1c8 100644 --- a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/ExceptionTranslationSupport.scala +++ b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/ExceptionTranslationSupport.scala @@ -23,7 +23,7 @@ import org.neo4j.cypher.internal.compiler.v3_1.spi.TokenContext import org.neo4j.cypher.{ConstraintValidationException, CypherExecutionException} import org.neo4j.graphdb.{ConstraintViolationException => KernelConstraintViolationException} import org.neo4j.kernel.api.TokenNameLookup -import org.neo4j.kernel.api.exceptions.KernelException +import org.neo4j.kernel.api.exceptions.{KernelException, ResourceCloseFailureException} trait ExceptionTranslationSupport { inner: TokenContext => @@ -39,6 +39,7 @@ trait ExceptionTranslationSupport { def relationshipTypeGetName(relTypeId: Int): String = inner.getRelTypeName(relTypeId) }), e) case e : KernelConstraintViolationException => throw new ConstraintValidationException(e.getMessage, e) + case e : ResourceCloseFailureException => throw new CypherExecutionException(e.getMessage, e) } protected def translateIterator[A](iteratorFactory: => Iterator[A]): Iterator[A] = { diff --git a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/TransactionBoundQueryContext.scala b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/TransactionBoundQueryContext.scala index a688c23c46c2b..69e512eaa5630 100644 --- a/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/TransactionBoundQueryContext.scala +++ b/community/cypher/cypher/src/main/scala/org/neo4j/cypher/internal/spi/v3_1/TransactionBoundQueryContext.scala @@ -626,7 +626,9 @@ final class TransactionBoundQueryContext(val transactionalContext: Transactional } override def callDbmsProcedure(name: QualifiedName, args: Seq[Any], allowed: Array[String]) = { - callProcedure(name, args, transactionalContext.dbmsOperations.procedureCallDbms(_,_,transactionalContext.securityContext)) + callProcedure(name, args, + transactionalContext.dbmsOperations.procedureCallDbms(_,_,transactionalContext.securityContext, + transactionalContext.resourceTracker)) } private def callProcedure(name: QualifiedName, args: Seq[Any], call: KernelProcedureCall) = { diff --git a/community/cypher/cypher/src/test/scala/org/neo4j/cypher/ExecutionEngineIT.scala b/community/cypher/cypher/src/test/scala/org/neo4j/cypher/ExecutionEngineIT.scala index 8cbd82bce238b..bcffa20bbe825 100644 --- a/community/cypher/cypher/src/test/scala/org/neo4j/cypher/ExecutionEngineIT.scala +++ b/community/cypher/cypher/src/test/scala/org/neo4j/cypher/ExecutionEngineIT.scala @@ -29,7 +29,7 @@ import org.neo4j.graphdb.Result.{ResultRow, ResultVisitor} import org.neo4j.graphdb.factory.GraphDatabaseSettings import org.neo4j.kernel.GraphDatabaseQueryService import org.neo4j.kernel.api.proc._ -import org.neo4j.kernel.api.Statement +import org.neo4j.kernel.api.{ResourceTracker, Statement} import org.neo4j.kernel.api.exceptions.ProcedureException import Context.KERNEL_TRANSACTION import org.neo4j.kernel.api.proc._ @@ -699,7 +699,8 @@ class ExecutionEngineIT extends CypherFunSuite with GraphIcing { fields :+ new FieldSignature(entry, results(entry).asInstanceOf[Neo4jTypes.AnyType]) }.asJava - override def apply(context: Context, objects: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { + override def apply(context: Context, objects: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = { val statement: Statement = context.get(KERNEL_TRANSACTION).acquireStatement val readOperations = statement.readOperations val nodes = readOperations.nodesGetAll() diff --git a/community/cypher/spec-suite-tools/src/test/scala/cypher/feature/steps/SpecSuiteSteps.scala b/community/cypher/spec-suite-tools/src/test/scala/cypher/feature/steps/SpecSuiteSteps.scala index 8eea796aaf2a6..f98dfe948c395 100644 --- a/community/cypher/spec-suite-tools/src/test/scala/cypher/feature/steps/SpecSuiteSteps.scala +++ b/community/cypher/spec-suite-tools/src/test/scala/cypher/feature/steps/SpecSuiteSteps.scala @@ -30,7 +30,7 @@ import org.neo4j.collection.RawIterator import org.neo4j.cypher.internal.frontend.v3_1.symbols.{CypherType, _} import org.neo4j.graphdb.factory.{GraphDatabaseFactory, GraphDatabaseSettings} import org.neo4j.graphdb.{GraphDatabaseService, QueryStatistics, Result, Transaction} -import org.neo4j.kernel.api.KernelAPI +import org.neo4j.kernel.api.{KernelAPI, ResourceTracker} import org.neo4j.kernel.api.exceptions.ProcedureException import org.neo4j.kernel.api.proc.CallableProcedure.BasicProcedure import org.neo4j.kernel.api.proc.{Context, Neo4jTypes} @@ -201,7 +201,8 @@ trait SpecSuiteSteps extends FunSuiteLike with Matchers with TCKCucumberTemplate ) val kernelSignature = asKernelSignature(parsedSignature) val kernelProcedure = new BasicProcedure(kernelSignature) { - override def apply(ctx: Context, input: Array[AnyRef]): RawIterator[Array[AnyRef], ProcedureException] = { + override def apply(ctx: Context, input: Array[AnyRef], + resourceTracker: ResourceTracker[_<:AutoCloseable]): RawIterator[Array[AnyRef], ProcedureException] = { val scalaIterator = tableValues .filter { row => input.indices.forall { index => row(index) == input(index) } } .map { row => row.drop(input.length).clone() } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/ResourceManager.java b/community/kernel/src/main/java/org/neo4j/kernel/api/ResourceManager.java new file mode 100644 index 0000000000000..5a2d7469de672 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/ResourceManager.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.api; + +public interface ResourceManager extends ResourceTracker +{ + /** + * Closes and unregisters all the registered resources + */ + void closeAllCloseableResources(); +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/ResourceTracker.java b/community/kernel/src/main/java/org/neo4j/kernel/api/ResourceTracker.java new file mode 100644 index 0000000000000..81e48d5229498 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/ResourceTracker.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.api; + +public interface ResourceTracker +{ + /** + * Register a closeable resource that needs to be closed on statement cleanup. + * + * If the given resource can be closed elsewhere, e.g. by exhausting an iterator, + * the close() method of the resource should be idempotent. + */ + void registerCloseableResource( T closeableResource ); + + /** + * @see #registerCloseableResource + */ + void unregisterCloseableResource( T closeableResource ); +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/Statement.java b/community/kernel/src/main/java/org/neo4j/kernel/api/Statement.java index 2418b6308ec4a..5bcfa0449b18e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/Statement.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/Statement.java @@ -33,7 +33,7 @@ * or {@link #schemaWriteOperations()}, otherwise if already decided, verified so that it's * of the same type. */ -public interface Statement extends Resource +public interface Statement extends Resource, ResourceManager { /** * @return interface exposing all read operations. diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/dbms/DbmsOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/api/dbms/DbmsOperations.java index f2c8639c5be37..272ff97aa5bef 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/dbms/DbmsOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/dbms/DbmsOperations.java @@ -21,6 +21,7 @@ import org.neo4j.collection.RawIterator; import org.neo4j.kernel.api.KernelAPI; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.QualifiedName; import org.neo4j.kernel.api.security.SecurityContext; @@ -39,7 +40,8 @@ public interface DbmsOperations RawIterator procedureCallDbms( QualifiedName name, Object[] input, - SecurityContext securityContext + SecurityContext securityContext, + ResourceTracker resourceTracker ) throws ProcedureException; /** Invoke a DBMS function by name */ diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/ResourceCloseFailureException.java b/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/ResourceCloseFailureException.java new file mode 100644 index 0000000000000..49702fe964b2d --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/ResourceCloseFailureException.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.api.exceptions; + +import org.neo4j.graphdb.Resource; + +/** + * This exception is thrown when a checked exception occurs inside {@link Resource#close()}. + * It is a RuntimeException since {@link Resource#close()} is not allowed to throw checked exceptions. + */ +public class ResourceCloseFailureException extends RuntimeException +{ + public ResourceCloseFailureException( String message, Throwable cause ) + { + super( message, cause ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/proc/CallableProcedure.java b/community/kernel/src/main/java/org/neo4j/kernel/api/proc/CallableProcedure.java index 9c4b9e4d6def5..3e3c450f5947f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/proc/CallableProcedure.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/proc/CallableProcedure.java @@ -20,12 +20,13 @@ package org.neo4j.kernel.api.proc; import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.ProcedureException; public interface CallableProcedure { ProcedureSignature signature(); - RawIterator apply( Context ctx, Object[] input ) throws ProcedureException; + RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException; abstract class BasicProcedure implements CallableProcedure { @@ -43,6 +44,6 @@ public ProcedureSignature signature() } @Override - public abstract RawIterator apply( Context ctx, Object[] input ) throws ProcedureException; + public abstract RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException; } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedure.java b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedure.java index 6b5a2339670de..b2636f0c3e305 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedure.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedure.java @@ -37,6 +37,7 @@ import org.neo4j.collection.RawIterator; import org.neo4j.helpers.collection.Pair; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.proc.CallableProcedure; @@ -66,7 +67,7 @@ public JmxQueryProcedure( QualifiedName name, MBeanServer jmxServer ) } @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { String query = input[0].toString(); try diff --git a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/ListComponentsProcedure.java b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/ListComponentsProcedure.java index 20efedf1ed8af..a04d08736e98e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/ListComponentsProcedure.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/ListComponentsProcedure.java @@ -20,6 +20,7 @@ package org.neo4j.kernel.builtinprocs; import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.Context; @@ -65,7 +66,7 @@ public ListComponentsProcedure( QualifiedName name, String neo4jVersion, String } @Override - public RawIterator apply( Context ctx, Object[] input ) + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { return asRawIterator( singletonList( diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CloseableResourceManager.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CloseableResourceManager.java new file mode 100644 index 0000000000000..4baeaacc9a051 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CloseableResourceManager.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api; + +import java.util.ArrayList; +import java.util.Collection; + +import org.neo4j.io.IOUtils; +import org.neo4j.kernel.api.ResourceManager; +import org.neo4j.kernel.api.exceptions.ResourceCloseFailureException; + +public class CloseableResourceManager implements ResourceManager +{ + private Collection closeableResources; + + // ResourceTracker + + @Override + public final void registerCloseableResource( AutoCloseable closeable ) + { + if ( closeableResources == null ) + { + closeableResources = new ArrayList<>( 8 ); + } + closeableResources.add( closeable ); + } + + @Override + public final void unregisterCloseableResource( AutoCloseable closeable ) + { + if ( closeableResources != null ) + { + closeableResources.remove( closeable ); + } + } + + // ResourceManager + + @Override + public final void closeAllCloseableResources() + { + if ( closeableResources != null ) + { + // Make sure we reset closeableResource before doing anything which may throw an exception that + // _may_ result in a recursive call to this close-method + Collection resourcesToClose = closeableResources; + closeableResources = null; + + IOUtils.closeAll( ResourceCloseFailureException.class, resourcesToClose.toArray( new AutoCloseable[0] ) ); + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java index caa7f6d64699d..1e754967d0253 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java @@ -62,7 +62,7 @@ * instance again, when it's initialized. * */ -public class KernelStatement implements TxStateHolder, Statement, AssertOpen +public class KernelStatement extends CloseableResourceManager implements TxStateHolder, Statement, AssertOpen { private final TxStateHolder txStateHolder; private final StorageStatement storeStatement; @@ -243,6 +243,7 @@ private void cleanupResources() // closing is done by KTI storeStatement.release(); executingQueryList = ExecutingQueryList.EMPTY; + closeAllCloseableResources(); } public KernelTransactionImplementation getTransaction() diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java index 76933840b9928..74882ddb9e0fe 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java @@ -1585,7 +1585,7 @@ private RawIterator callProcedure( ctx.put( Context.KERNEL_TRANSACTION, tx ); ctx.put( Context.THREAD, Thread.currentThread() ); ctx.put( Context.SECURITY_CONTEXT, procedureSecurityContext ); - procedureCall = procedures.callProcedure( ctx, name, input ); + procedureCall = procedures.callProcedure( ctx, name, input, statement ); } return new RawIterator() { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/dbms/NonTransactionalDbmsOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/dbms/NonTransactionalDbmsOperations.java index dae12d43092a1..2af0bb5ec8592 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/dbms/NonTransactionalDbmsOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/dbms/NonTransactionalDbmsOperations.java @@ -20,6 +20,7 @@ package org.neo4j.kernel.impl.api.dbms; import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.dbms.DbmsOperations; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.BasicContext; @@ -43,12 +44,13 @@ public NonTransactionalDbmsOperations( Procedures procedures ) public RawIterator procedureCallDbms( QualifiedName name, Object[] input, - SecurityContext securityContext + SecurityContext securityContext, + ResourceTracker resourceTracker ) throws ProcedureException { BasicContext ctx = new BasicContext(); ctx.put( Context.SECURITY_CONTEXT, securityContext ); - return procedures.callProcedure( ctx, name, input ); + return procedures.callProcedure( ctx, name, input, resourceTracker ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ProcedureRegistry.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ProcedureRegistry.java index d03b2b2328677..6366042f6834f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ProcedureRegistry.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ProcedureRegistry.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.proc.CallableProcedure; @@ -148,7 +149,7 @@ public Optional function( QualifiedName name ) return Optional.of( func.signature() ); } - public RawIterator callProcedure( Context ctx, QualifiedName name, Object[] input ) + public RawIterator callProcedure( Context ctx, QualifiedName name, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { CallableProcedure proc = procedures.get( name ); @@ -156,7 +157,7 @@ public RawIterator callProcedure( Context ctx, Qual { throw noSuchProcedure( name ); } - return proc.apply( ctx, input ); + return proc.apply( ctx, input, resourceTracker ); } public Object callFunction( Context ctx, QualifiedName name, Object[] input ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/Procedures.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/Procedures.java index 7a88fdf1de18b..9ceede5371532 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/Procedures.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/Procedures.java @@ -25,6 +25,7 @@ import org.neo4j.collection.RawIterator; import org.neo4j.function.ThrowingConsumer; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.CallableProcedure; @@ -187,9 +188,9 @@ public Set getAllFunctions() } public RawIterator callProcedure( Context ctx, QualifiedName name, - Object[] input ) throws ProcedureException + Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { - return registry.callProcedure( ctx, name, input ); + return registry.callProcedure( ctx, name, input, resourceTracker ); } public Object callFunction( Context ctx, QualifiedName name, diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureCompiler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureCompiler.java index ee02a0f76aaaa..52166c99d640b 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureCompiler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureCompiler.java @@ -34,8 +34,12 @@ import java.util.stream.Stream; import org.neo4j.collection.RawIterator; +import org.neo4j.graphdb.Resource; +import org.neo4j.io.IOUtils; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.exceptions.ResourceCloseFailureException; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.CallableUserFunction; @@ -338,7 +342,7 @@ public ProcedureSignature signature() } @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { // For now, create a new instance of the class for each invocation. In the future, we'd like to keep // instances local to @@ -370,33 +374,27 @@ public RawIterator apply( Context ctx, Object[] inp } else { - return new MappingIterator( ((Stream) rs).iterator(), () -> ((Stream) rs).close() ); + return new MappingIterator( ((Stream) rs).iterator(), ((Stream) rs)::close, resourceTracker ); } } catch ( Throwable throwable ) { - if ( throwable instanceof Status.HasStatus ) - { - throw new ProcedureException( ((Status.HasStatus) throwable).status(), throwable, - throwable.getMessage() ); - } - else - { - throw new ProcedureException( Status.Procedure.ProcedureCallFailed, throwable, - "Failed to invoke procedure `%s`: %s", signature.name(), "Caused by: " + throwable ); - } + throw newProcedureException( throwable ); } } - private class MappingIterator implements RawIterator, AutoCloseable + private class MappingIterator implements RawIterator, Resource { private final Iterator out; - private Closeable closeable; + private Resource closeableResource; + private ResourceTracker resourceTracker; - MappingIterator( Iterator out, Closeable closeable ) + MappingIterator( Iterator out, Resource closeableResource, ResourceTracker resourceTracker ) { this.out = out; - this.closeable = closeable; + this.closeableResource = closeableResource; + this.resourceTracker = resourceTracker; + resourceTracker.registerCloseableResource( closeableResource ); } @Override @@ -407,14 +405,13 @@ public boolean hasNext() throws ProcedureException boolean hasNext = out.hasNext(); if ( !hasNext ) { - closeable.close(); + close(); } return hasNext; } - catch ( RuntimeException | IOException e ) + catch ( Throwable throwable ) { - throw new ProcedureException( Status.Procedure.ProcedureCallFailed, e, - "Failed to call procedure `%s`: %s", signature, e.getMessage() ); + throw closeAndCreateProcedureException( throwable ); } } @@ -426,22 +423,66 @@ public Object[] next() throws ProcedureException Object record = out.next(); return outputMapper.apply( record ); } - catch ( RuntimeException e ) + catch ( Throwable throwable ) { - throw new ProcedureException( Status.Procedure.ProcedureCallFailed, e, - "Failed to call procedure `%s`: %s", signature, e.getMessage() ); + throw closeAndCreateProcedureException( throwable ); } } @Override - public void close() throws Exception + public void close() + { + if ( closeableResource != null ) + { + // Make sure we reset closeableResource before doing anything which may throw an exception that may + // result in a recursive call to this close-method + Resource resourceToClose = closeableResource; + closeableResource = null; + + IOUtils.closeAll( ResourceCloseFailureException.class, + () -> resourceTracker.unregisterCloseableResource( resourceToClose ), + resourceToClose::close ); + } + } + + private ProcedureException closeAndCreateProcedureException( Throwable t ) { - if ( closeable != null ) + ProcedureException procedureException = newProcedureException( t ); + + try { - closeable.close(); - closeable = null; + close(); } + catch ( Exception exceptionDuringClose ) + { + try + { + procedureException.addSuppressed( exceptionDuringClose ); + } + catch ( Throwable ignore ) + { + } + } + return procedureException; + } + } + + private ProcedureException newProcedureException( Throwable throwable ) + { + ProcedureException procedureException; + + if ( throwable instanceof Status.HasStatus ) + { + procedureException = new ProcedureException( ((Status.HasStatus) throwable).status(), throwable, + throwable.getMessage() ); } + else + { + procedureException = new ProcedureException( Status.Procedure.ProcedureCallFailed, throwable, + "Failed to invoke procedure `%s`: %s", signature.name(), "Caused by: " + throwable ); + } + + return procedureException; } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java index ab683e972b1c4..cd7181551f2ea 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java @@ -267,6 +267,14 @@ public SecurityContext securityContext() return securityContext; } + @Override + public ResourceTracker resourceTracker() + { + // We use the current statement as resourceTracker since it is attached to the KernelTransaction + // and is guaranteed to be cleaned up on transaction failure. + return statement; + } + interface Creator { Neo4jTransactionalContext create( Supplier statementSupplier, diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java index 5ef94126c023e..2499c6263d20c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java @@ -25,6 +25,7 @@ import org.neo4j.kernel.api.ExecutingQuery; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.ReadOperations; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.Statement; import org.neo4j.kernel.api.dbms.DbmsOperations; import org.neo4j.kernel.api.security.SecurityContext; @@ -78,4 +79,6 @@ public interface TransactionalContext SecurityContext securityContext(); KernelTransaction.Revertable restrictCurrentTransaction( SecurityContext context ); + + ResourceTracker resourceTracker(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/StubResourceManager.java b/community/kernel/src/test/java/org/neo4j/kernel/api/StubResourceManager.java new file mode 100644 index 0000000000000..b95264c414a4a --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/StubResourceManager.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.api; + +public class StubResourceManager implements ResourceManager +{ + @Override + public void registerCloseableResource( AutoCloseable closeable ) + { + } + + @Override + public void unregisterCloseableResource( AutoCloseable closeable ) + { + } + + @Override + public void closeAllCloseableResources() + { + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java index 938e8610f6788..8da75ba1366dc 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java @@ -37,7 +37,9 @@ import org.neo4j.helpers.collection.Iterators; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.ReadOperations; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.Statement; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.constraints.NodePropertyExistenceConstraint; import org.neo4j.kernel.api.constraints.PropertyConstraint; import org.neo4j.kernel.api.constraints.UniquenessConstraint; @@ -89,6 +91,7 @@ public class BuiltInProceduresTest private final GraphDatabaseAPI graphDatabaseAPI = mock(GraphDatabaseAPI.class); private final Procedures procs = new Procedures(); + private final ResourceTracker resourceTracker = new StubResourceManager(); @Test public void shouldListAllIndexes() throws Throwable @@ -408,7 +411,7 @@ private List call(String name, Object ... args) throws ProcedureExcept ctx.put( SECURITY_CONTEXT, SecurityContext.AUTH_DISABLED ); when( graphDatabaseAPI.getDependencyResolver() ).thenReturn( resolver ); when( resolver.resolveDependency( Procedures.class ) ).thenReturn( procs ); - return Iterators.asList( procs.callProcedure( ctx, ProcedureSignature.procedureName( name.split( "\\." ) ), args ) ); + return Iterators.asList( procs.callProcedure( ctx, ProcedureSignature.procedureName( name.split( "\\." ) ), args, resourceTracker ) ); } private static final Key DEPENDENCY_RESOLVER = diff --git a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedureTest.java b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedureTest.java index e589412d6124a..798185e2339b0 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedureTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/JmxQueryProcedureTest.java @@ -34,6 +34,8 @@ import javax.management.openmbean.SimpleType; import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.ResourceTracker; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.ProcedureSignature; @@ -52,6 +54,7 @@ public class JmxQueryProcedureTest private MBeanServer jmxServer; private ObjectName beanName; private String attributeName; + private final ResourceTracker resourceTracker = new StubResourceManager(); @Test public void shouldHandleBasicMBean() throws Throwable @@ -61,7 +64,7 @@ public void shouldHandleBasicMBean() throws Throwable JmxQueryProcedure procedure = new JmxQueryProcedure( ProcedureSignature.procedureName( "bob" ), jmxServer ); // when - RawIterator result = procedure.apply( null, new Object[]{"*:*"} ); + RawIterator result = procedure.apply( null, new Object[]{"*:*"}, resourceTracker ); // then assertThat( asList( result ), contains( @@ -89,7 +92,7 @@ public void shouldHandleMBeanThatThrowsOnGetAttribute() throws Throwable JmxQueryProcedure procedure = new JmxQueryProcedure( ProcedureSignature.procedureName( "bob" ), jmxServer ); // when - RawIterator result = procedure.apply( null, new Object[]{"*:*"} ); + RawIterator result = procedure.apply( null, new Object[]{"*:*"}, resourceTracker ); // then assertThat( asList( result ), contains( @@ -133,7 +136,7 @@ public void shouldHandleCompositeAttributes() throws Throwable JmxQueryProcedure procedure = new JmxQueryProcedure( ProcedureSignature.procedureName( "bob" ), jmxServer ); // when - RawIterator result = procedure.apply( null, new Object[]{"*:*"} ); + RawIterator result = procedure.apply( null, new Object[]{"*:*"}, resourceTracker ); // then assertThat( asList( result ), contains( @@ -158,7 +161,7 @@ public void shouldConvertAllStandardBeansWithoutError() throws Throwable JmxQueryProcedure procedure = new JmxQueryProcedure( ProcedureSignature.procedureName( "bob" ), jmxServer ); // when - RawIterator result = procedure.apply( null, new Object[]{"*:*"} ); + RawIterator result = procedure.apply( null, new Object[]{"*:*"}, resourceTracker ); // then we verify that we respond with the expected number of beans without error // .. we don't assert more than this, this is more of a smoke test to ensure diff --git a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/StubStatement.java b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/StubStatement.java index 1f516d73ec436..10c84bae4d031 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/StubStatement.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/StubStatement.java @@ -25,10 +25,11 @@ import org.neo4j.kernel.api.ReadOperations; import org.neo4j.kernel.api.SchemaWriteOperations; import org.neo4j.kernel.api.Statement; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.TokenWriteOperations; import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException; -public class StubStatement implements Statement +public class StubStatement extends StubResourceManager implements Statement { private final ReadOperations readOperations; diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java index d802941529eaf..fa9a0d7b55a51 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java @@ -25,7 +25,9 @@ import org.neo4j.collection.RawIterator; import org.neo4j.graphdb.Transaction; import org.neo4j.kernel.api.DataWriteOperations; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.SchemaWriteOperations; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.security.AnonymousContext; import org.neo4j.kernel.internal.Version; @@ -45,6 +47,8 @@ public class BuiltInProceduresIT extends KernelIntegrationTest @Rule public ExpectedException exception = ExpectedException.none(); + private final ResourceTracker resourceTracker = new StubResourceManager(); + @Test public void listAllLabels() throws Throwable { @@ -150,7 +154,7 @@ public void failWhenCallingNonExistingProcedures() throws Throwable { // When dbmsOperations().procedureCallDbms( procedureName( "dbms", "iDoNotExist" ), new Object[0], - AnonymousContext.none() ); + AnonymousContext.none(), resourceTracker ); fail( "This should never get here" ); } catch ( Exception e ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ProceduresKernelIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ProceduresKernelIT.java index 8f38bbd911505..1c84ff3a79b63 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ProceduresKernelIT.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ProceduresKernelIT.java @@ -27,6 +27,7 @@ import org.neo4j.collection.RawIterator; import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.Context; @@ -131,7 +132,7 @@ public void registeredProcedureShouldGetReadOperations() throws Throwable kernel.registerProcedure( new CallableProcedure.BasicProcedure( signature ) { @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { return RawIterator.of( new Object[]{ ctx.get( Context.KERNEL_TRANSACTION ).acquireStatement().readOperations() } ); } @@ -149,7 +150,7 @@ private static CallableProcedure procedure( final ProcedureSignature signature ) return new CallableProcedure.BasicProcedure( signature ) { @Override - public RawIterator apply( Context ctx, Object[] input ) + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) { return RawIterator.of( input ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProcedureJarLoaderTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProcedureJarLoaderTest.java index 4a29b59487152..db05065c8eed6 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProcedureJarLoaderTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProcedureJarLoaderTest.java @@ -32,6 +32,8 @@ import java.util.Random; import java.util.stream.Stream; +import org.neo4j.kernel.api.ResourceTracker; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.BasicContext; import org.neo4j.kernel.api.proc.CallableProcedure; @@ -58,6 +60,7 @@ public class ProcedureJarLoaderTest private final ProcedureJarLoader jarloader = new ProcedureJarLoader( new ReflectiveProcedureCompiler( new TypeMappers(), new ComponentRegistry(), NullLog.getInstance(), ProcedureAllowedConfig.DEFAULT ), NullLog.getInstance() ); + private final ResourceTracker resourceTracker = new StubResourceManager(); @Test public void shouldLoadProcedureFromJar() throws Throwable @@ -73,7 +76,7 @@ public void shouldLoadProcedureFromJar() throws Throwable assertThat( signatures, contains( procedureSignature( "org","neo4j", "kernel", "impl", "proc", "myProcedure" ).out( "someNumber", NTInteger ).build() )); - assertThat( asList( procedures.get( 0 ).apply( new BasicContext(), new Object[0] ) ), + assertThat( asList( procedures.get( 0 ).apply( new BasicContext(), new Object[0], resourceTracker ) ), contains( IsEqual.equalTo( new Object[]{1337L} )) ); } @@ -94,7 +97,7 @@ public void shouldLoadProcedureWithArgumentFromJar() throws Throwable .out( "someNumber", NTInteger ) .build() )); - assertThat( asList(procedures.get( 0 ).apply( new BasicContext(), new Object[]{42L} ) ), + assertThat( asList(procedures.get( 0 ).apply( new BasicContext(), new Object[]{42L}, resourceTracker ) ), contains( IsEqual.equalTo( new Object[]{42L} )) ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProceduresTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProceduresTest.java index 7106a4f28b05e..eadbdade4a750 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProceduresTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ProceduresTest.java @@ -27,6 +27,8 @@ import org.neo4j.collection.RawIterator; import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.api.ResourceTracker; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.BasicContext; import org.neo4j.kernel.api.proc.CallableProcedure; @@ -56,6 +58,7 @@ public class ProceduresTest private final Procedures procs = new Procedures(); private final ProcedureSignature signature = procedureSignature( "org", "myproc" ).out( "name", NTString ).build(); private final CallableProcedure procedure = procedure( signature ); + private final ResourceTracker resourceTracker = new StubResourceManager(); @Test public void shouldGetRegisteredProcedure() throws Throwable @@ -90,7 +93,8 @@ public void shouldCallRegisteredProcedure() throws Throwable procs.register( procedure ); // When - RawIterator result = procs.callProcedure( new BasicContext(), signature.name(), new Object[]{1337} ); + RawIterator result = + procs.callProcedure( new BasicContext(), signature.name(), new Object[]{1337}, resourceTracker ); // Then assertThat( asList( result ), contains( equalTo( new Object[]{1337} ) ) ); @@ -106,7 +110,7 @@ public void shouldNotAllowCallingNonExistingProcedure() throws Throwable "procedure name correctly and that the procedure is properly deployed." ); // When - procs.callProcedure( new BasicContext(), signature.name(), new Object[]{1337} ); + procs.callProcedure( new BasicContext(), signature.name(), new Object[]{1337}, resourceTracker ); } @Test @@ -171,7 +175,7 @@ public void shouldMakeContextAvailable() throws Throwable procs.register( new CallableProcedure.BasicProcedure(signature) { @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { return RawIterator.of( new Object[]{ctx.get( someKey )} ); } @@ -181,7 +185,7 @@ public RawIterator apply( Context ctx, Object[] in ctx.put( someKey, "hello, world" ); // When - RawIterator result = procs.callProcedure( ctx, signature.name(), new Object[0] ); + RawIterator result = procs.callProcedure( ctx, signature.name(), new Object[0], resourceTracker ); // Then assertThat( asList( result ), contains( equalTo( new Object[]{ "hello, world" } ) ) ); @@ -260,7 +264,7 @@ private CallableProcedure.BasicProcedure procedureWithSignature( final Procedure return new CallableProcedure.BasicProcedure(signature) { @Override - public RawIterator apply( Context ctx, Object[] input ) throws ProcedureException + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException { return null; } @@ -272,7 +276,7 @@ private CallableProcedure procedure( ProcedureSignature signature ) return new CallableProcedure.BasicProcedure( signature ) { @Override - public RawIterator apply( Context ctx, Object[] input ) + public RawIterator apply( Context ctx, Object[] input, ResourceTracker resourceTracker ) { return RawIterator.of( input ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureTest.java index bf0f5e37a27f8..b0f8f0a49c93b 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureTest.java @@ -19,6 +19,8 @@ */ package org.neo4j.kernel.impl.proc; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; @@ -30,8 +32,11 @@ import org.neo4j.collection.RawIterator; import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.kernel.api.ResourceTracker; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.ProcedureException; +import org.neo4j.kernel.api.exceptions.ResourceCloseFailureException; import org.neo4j.kernel.api.proc.BasicContext; import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.Neo4jTypes; @@ -61,6 +66,7 @@ public class ReflectiveProcedureTest private ReflectiveProcedureCompiler procedureCompiler; private ComponentRegistry components; + private final ResourceTracker resourceTracker = new StubResourceManager(); @Before public void setUp() throws Exception @@ -78,7 +84,7 @@ public void shouldInjectLogging() throws KernelException CallableProcedure procedure = procedureCompiler.compileProcedure( LoggingProcedure.class ).get( 0 ); // When - procedure.apply( new BasicContext(), new Object[0] ); + procedure.apply( new BasicContext(), new Object[0], resourceTracker ); // Then verify( log ).debug( "1" ); @@ -108,7 +114,7 @@ public void shouldRunSimpleReadOnlyProcedure() throws Throwable CallableProcedure proc = compile( SingleReadOnlyProcedure.class ).get( 0 ); // When - RawIterator out = proc.apply( new BasicContext(), new Object[0] ); + RawIterator out = proc.apply( new BasicContext(), new Object[0], resourceTracker ); // Then assertThat( asList( out ), contains( @@ -136,8 +142,8 @@ public void shouldRunClassWithMultipleProceduresDeclared() throws Throwable CallableProcedure coolPeople = compiled.get( 1 ); // When - RawIterator coolOut = coolPeople.apply( new BasicContext(), new Object[0] ); - RawIterator bananaOut = bananaPeople.apply( new BasicContext(), new Object[0] ); + RawIterator coolOut = coolPeople.apply( new BasicContext(), new Object[0], resourceTracker ); + RawIterator bananaOut = bananaPeople.apply( new BasicContext(), new Object[0], resourceTracker ); // Then assertThat( asList( coolOut ), contains( @@ -185,7 +191,7 @@ public void shouldAllowVoidOutput() throws Throwable // Then assertEquals( 0, proc.signature().outputSignature().size() ); - assertFalse( proc.apply( null, new Object[0] ).hasNext() ); + assertFalse( proc.apply( null, new Object[0], resourceTracker ).hasNext() ); } @Test @@ -263,7 +269,55 @@ public void shouldGiveHelpfulErrorOnNullMessageException() throws Throwable "Caused by: java.lang.IndexOutOfBoundsException" ); // When - proc.apply( new BasicContext(), new Object[0] ); + proc.apply( new BasicContext(), new Object[0], resourceTracker ); + } + + @Test + public void shouldCloseResourcesAndGiveHelpfulErrorOnMidStreamException() throws Throwable + { + // Given + CallableProcedure proc = compile( ProcedureThatThrowsNullMsgExceptionMidStream.class ).get( 0 ); + + // Expect + exception.expect( ProcedureException.class ); + exception.expectMessage( "Failed to invoke procedure `org.neo4j.kernel.impl.proc.throwsInStream`: " + + "Caused by: java.lang.IndexOutOfBoundsException" ); + + // Expect that we get a suppressed exception from Stream.onClose (which also verifies that we actually call + // onClose on the first exception) + exception.expect( new BaseMatcher() + { + @Override + public void describeTo( Description description ) + { + description.appendText( "a suppressed exception with cause ExceptionDuringClose" ); + } + + @Override + public boolean matches( Object item ) + { + Exception e = (Exception) item; + for ( Throwable suppressed : e.getSuppressed() ) + { + if ( suppressed instanceof ResourceCloseFailureException ) + { + if ( suppressed.getCause() instanceof ExceptionDuringClose ) + { + return true; + } + } + } + return false; + } + } ); + + // When + RawIterator stream = + proc.apply( new BasicContext(), new Object[0], resourceTracker ); + if ( stream.hasNext() ) + { + stream.next(); + } } @Test @@ -283,7 +337,7 @@ public void shouldSupportProcedureDeprecation() throws Throwable for ( CallableProcedure proc : procs ) { String name = proc.signature().name().name(); - proc.apply( new BasicContext(), new Object[0] ); + proc.apply( new BasicContext(), new Object[0], resourceTracker ); switch ( name ) { case "newProc": @@ -441,9 +495,12 @@ public static class ProcedureThatThrowsNullMsgExceptionMidStream @Procedure public Stream throwsInStream( ) { - return Stream.generate( () -> { + return Stream.generate( () -> { throw new IndexOutOfBoundsException(); - }); + }).onClose( () -> + { + throw new ExceptionDuringClose(); + } ); } } @@ -523,4 +580,8 @@ private List compile( Class clazz ) throws KernelException { return procedureCompiler.compileProcedure( clazz ); } + + private static class ExceptionDuringClose extends RuntimeException + { + } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureWithArgumentsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureWithArgumentsTest.java index 0610446b384e3..744e0a6b6fd9e 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureWithArgumentsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ReflectiveProcedureWithArgumentsTest.java @@ -31,6 +31,8 @@ import java.util.stream.Stream; import org.neo4j.collection.RawIterator; +import org.neo4j.kernel.api.ResourceTracker; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.BasicContext; @@ -50,6 +52,8 @@ public class ReflectiveProcedureWithArgumentsTest @Rule public ExpectedException exception = ExpectedException.none(); + private final ResourceTracker resourceTracker = new StubResourceManager(); + @Test public void shouldCompileSimpleProcedure() throws Throwable { @@ -73,7 +77,7 @@ public void shouldRunSimpleProcedure() throws Throwable CallableProcedure procedure = compile( ClassWithProcedureWithSimpleArgs.class ).get( 0 ); // When - RawIterator out = procedure.apply( new BasicContext(), new Object[]{"Pontus", 35L} ); + RawIterator out = procedure.apply( new BasicContext(), new Object[]{"Pontus", 35L}, resourceTracker ); // Then List collect = asList( out ); @@ -89,7 +93,7 @@ public void shouldRunGenericProcedure() throws Throwable // When RawIterator out = procedure.apply( new BasicContext(), new Object[]{ Arrays.asList( "Roland", "Eddie", "Susan", "Jake" ), - Arrays.asList( 1000L, 23L, 29L, 12L )} ); + Arrays.asList( 1000L, 23L, 29L, 12L )}, resourceTracker ); // Then List collect = asList( out ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ResourceInjectionTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ResourceInjectionTest.java index 85e183d5a29d4..03b8f1e253e3b 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ResourceInjectionTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/proc/ResourceInjectionTest.java @@ -27,6 +27,8 @@ import java.util.stream.Stream; import org.neo4j.helpers.collection.Iterators; +import org.neo4j.kernel.api.ResourceTracker; +import org.neo4j.kernel.api.StubResourceManager; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.proc.BasicContext; @@ -45,6 +47,8 @@ public class ResourceInjectionTest @Rule public ExpectedException exception = ExpectedException.none(); + private final ResourceTracker resourceTracker = new StubResourceManager(); + @Test public void shouldCompileAndRunProcedure() throws Throwable { @@ -52,7 +56,7 @@ public void shouldCompileAndRunProcedure() throws Throwable CallableProcedure proc = compile( ProcedureWithInjectedAPI.class ).get( 0 ); // Then - List out = Iterators.asList( proc.apply( new BasicContext(), new Object[0] ) ); + List out = Iterators.asList( proc.apply( new BasicContext(), new Object[0], resourceTracker ) ); // Then assertThat( out.get( 0 ), equalTo( (new Object[]{"Bonnie"}) ) );