From 5050009919cb7bf7b778650e5ef89b5e14f1479c Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 27 Sep 2016 15:10:01 +0200 Subject: [PATCH] Make it possible to terminate all running queries through Bolt Before this change, PERIODIC COMMIT queries could not be aborted, which was a pity, since they are specially prone to long run-times. --- community/bolt/pom.xml | 6 + .../runtime/LifecycleManagedBoltFactory.java | 6 +- .../v1/runtime/TransactionStateMachine.java | 74 ++++----- .../runtime/TransactionStateMachineSPI.java | 72 ++++++--- .../runtime/cypher/CypherAdapterStream.java | 2 +- .../runtime/cypher/CypherStatementRunner.java | 69 --------- .../bolt/v1/runtime/spi/BookmarkResult.java | 1 - .../bolt/v1/runtime/spi/StatementRunner.java | 37 ----- .../cypher/CypherAdapterStreamTest.java | 9 +- .../v1/runtime/integration/TransactionIT.java | 144 +++++++++++++++++- .../impl/query/Neo4jTransactionalContext.java | 24 ++- .../impl/query/TransactionalContext.java | 11 ++ .../impl/query/FakeTransactionalContext.java | 6 + 13 files changed, 289 insertions(+), 172 deletions(-) delete mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java delete mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java diff --git a/community/bolt/pom.xml b/community/bolt/pom.xml index 8759e57f2efc0..7443bb7c06863 100644 --- a/community/bolt/pom.xml +++ b/community/bolt/pom.xml @@ -81,6 +81,12 @@ mockito-core test + + org.eclipse.jetty + jetty-server + test + + org.neo4j diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java index a130d0341c880..33750211cc854 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java @@ -22,7 +22,6 @@ import java.time.Clock; import org.neo4j.bolt.security.auth.Authentication; -import org.neo4j.bolt.v1.runtime.cypher.CypherStatementRunner; import org.neo4j.graphdb.DependencyResolver; import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.api.bolt.BoltConnectionTracker; @@ -92,9 +91,10 @@ public void shutdown() throws Throwable @Override public BoltStateMachine newMachine( String connectionDescriptor, Runnable onClose, Clock clock ) { - final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, queryService ); TransactionStateMachine.SPI transactionSPI = new TransactionStateMachineSPI( gds, txBridge, - queryExecutionEngine, statementRunner, transactionIdStore ); + queryExecutionEngine, + transactionIdStore, + queryService, clock ); BoltStateMachine.SPI boltSPI = new BoltStateMachineSPI( connectionDescriptor, usageData, logging, authentication, connectionTracker, transactionSPI ); return new BoltStateMachine( boltSPI, onClose, Clock.systemUTC() ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java index 241004fcaa604..100c68c0389f8 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java @@ -25,14 +25,12 @@ import org.neo4j.bolt.security.auth.AuthenticationResult; import org.neo4j.bolt.v1.runtime.bookmarking.Bookmark; -import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream; import org.neo4j.bolt.v1.runtime.cypher.StatementMetadata; import org.neo4j.bolt.v1.runtime.cypher.StatementProcessor; import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.bolt.v1.runtime.spi.BookmarkResult; import org.neo4j.cypher.InvalidSemanticsException; import org.neo4j.function.ThrowingConsumer; -import org.neo4j.graphdb.Result; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.Status; @@ -102,8 +100,7 @@ public void streamResult( ThrowingConsumer resultConsumer @Override public void reset() throws TransactionFailureException { - state.rollbackTransaction( ctx ); - state.closeResult( ctx ); + state.terminateQueryAndRollbackTransaction( ctx ); state = State.AUTO_COMMIT; } @@ -174,20 +171,16 @@ else if ( statement.equalsIgnoreCase( ROLLBACK ) ) } else if ( spi.isPeriodicCommit( statement ) ) { - Result result = executeQuery( ctx, spi, statement, params ); - - ctx.currentTransaction = spi.beginTransaction( ctx.authSubject ); - - ctx.currentResult = new CypherAdapterStream( result, ctx.clock ); + ctx.currentTransaction = null; + ctx.currentResultHandle = executeQuery( ctx, spi, statement, params ); + ctx.currentResult = ctx.currentResultHandle.start(); return AUTO_COMMIT; } else { ctx.currentTransaction = spi.beginTransaction( ctx.authSubject ); - - Result result = execute( ctx, spi, statement, params ); - - ctx.currentResult = new CypherAdapterStream( result, ctx.clock ); + ctx.currentResultHandle = execute( ctx, spi, statement, params ); + ctx.currentResult = ctx.currentResultHandle.start(); return AUTO_COMMIT; } } @@ -196,8 +189,8 @@ else if ( spi.isPeriodicCommit( statement ) ) * In AUTO_COMMIT we must make sure to fail, close and set the current * transaction to null. */ - private Result execute( MutableTransactionState ctx, SPI spi, - String statement, Map params ) + private BoltResultHandle execute( MutableTransactionState ctx, SPI spi, + String statement, Map params ) throws TransactionFailureException, QueryExecutionKernelException { try @@ -276,15 +269,15 @@ else if( spi.isPeriodicCommit( statement ) ) } else { - Result result = execute( ctx, spi, statement, params ); - - ctx.currentResult = new CypherAdapterStream( result, ctx.clock ); + ctx.currentResultHandle = execute( ctx, spi, statement, params ); + ctx.currentResult = ctx.currentResultHandle.start(); return EXPLICIT_TRANSACTION; } } - private Result execute( MutableTransactionState ctx, SPI spi, - String statement, Map params ) throws QueryExecutionKernelException + private BoltResultHandle execute( MutableTransactionState ctx, SPI spi, + String statement, Map params ) + throws QueryExecutionKernelException { try { @@ -317,36 +310,40 @@ abstract State run( MutableTransactionState ctx, abstract void streamResult( MutableTransactionState ctx, ThrowingConsumer resultConsumer ) throws Exception; - void rollbackTransaction( MutableTransactionState ctx ) throws TransactionFailureException + void terminateQueryAndRollbackTransaction( MutableTransactionState ctx ) throws TransactionFailureException { - if ( ctx.currentTransaction != null ) + if ( ctx.currentResultHandle != null ) { - if ( ctx.currentTransaction.isOpen() ) - { - ctx.currentTransaction.failure(); - ctx.currentTransaction.close(); - ctx.currentTransaction = null; - } + ctx.currentResultHandle.terminate(); + ctx.currentResultHandle = null; } - } - - void closeResult( MutableTransactionState ctx ) - { if ( ctx.currentResult != null ) { ctx.currentResult.close(); ctx.currentResult = null; } + if ( ctx.currentTransaction != null && ctx.currentTransaction.isOpen() ) + { + ctx.currentTransaction.failure(); + ctx.currentTransaction.close(); + ctx.currentTransaction = null; + } } - } - private static Result executeQuery( MutableTransactionState ctx, SPI spi, String statement, - Map params ) throws QueryExecutionKernelException + private static BoltResultHandle executeQuery( MutableTransactionState ctx, SPI spi, String statement, + Map params ) + throws QueryExecutionKernelException { return spi.executeQuery( ctx.querySource, ctx.authSubject, statement, params ); } + interface BoltResultHandle + { + BoltResult start() throws QueryExecutionKernelException; + void terminate(); + } + static class MutableTransactionState { /** The current session auth state to be used for starting transactions */ @@ -371,6 +368,7 @@ public String[] fieldNames() }; String querySource; + BoltResultHandle currentResultHandle; private MutableTransactionState( AuthenticationResult authenticationResult, Clock clock ) { @@ -393,7 +391,9 @@ interface SPI boolean isPeriodicCommit( String query ); - Result executeQuery( String querySource, AuthSubject authSubject, String statement, Map params ) - throws QueryExecutionKernelException; + BoltResultHandle executeQuery( String querySource, + AuthSubject authSubject, + String statement, + Map params ) throws QueryExecutionKernelException; } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java index c9eabbb9919e8..d43270f7807d3 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java @@ -19,41 +19,59 @@ */ package org.neo4j.bolt.v1.runtime; +import java.time.Clock; import java.time.Duration; import java.util.Map; -import org.neo4j.bolt.v1.runtime.spi.StatementRunner; +import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle; +import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream; +import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.graphdb.Result; +import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.security.AuthSubject; import org.neo4j.kernel.api.txtracking.TransactionIdTracker; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; +import org.neo4j.kernel.impl.coreapi.InternalTransaction; +import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker; +import org.neo4j.kernel.impl.query.Neo4jTransactionalContextFactory; import org.neo4j.kernel.impl.query.QueryExecutionEngine; import org.neo4j.kernel.impl.query.QueryExecutionKernelException; +import org.neo4j.kernel.impl.query.QuerySource; +import org.neo4j.kernel.impl.query.TransactionalContext; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.internal.GraphDatabaseAPI; +import static org.neo4j.kernel.api.KernelTransaction.Type.implicit; + class TransactionStateMachineSPI implements TransactionStateMachine.SPI { private final GraphDatabaseAPI db; private final ThreadToStatementContextBridge txBridge; private final QueryExecutionEngine queryExecutionEngine; - private final StatementRunner statementRunner; private final TransactionIdTracker transactionIdTracker; + private static final PropertyContainerLocker locker = new PropertyContainerLocker(); + private final Neo4jTransactionalContextFactory contextFactory; + private final GraphDatabaseQueryService queryService; + private final Clock clock; TransactionStateMachineSPI( GraphDatabaseAPI db, ThreadToStatementContextBridge txBridge, QueryExecutionEngine queryExecutionEngine, - StatementRunner statementRunner, - TransactionIdStore transactionIdStoreSupplier ) + TransactionIdStore transactionIdStoreSupplier, + GraphDatabaseQueryService queryService, + Clock clock ) { this.db = db; this.txBridge = txBridge; this.queryExecutionEngine = queryExecutionEngine; - this.statementRunner = statementRunner; this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier ); + this.contextFactory = new Neo4jTransactionalContextFactory( queryService, locker ); + this.queryService = queryService; + + this.clock = clock; } @Override @@ -94,19 +112,39 @@ public boolean isPeriodicCommit( String query ) } @Override - public Result executeQuery( String querySource, - AuthSubject authSubject, - String statement, - Map params ) throws QueryExecutionKernelException + public BoltResultHandle executeQuery( String querySource, + AuthSubject authSubject, + String statement, + Map params ) throws QueryExecutionKernelException { - try - { - return statementRunner.run( querySource, authSubject, statement, params ); - } - catch ( KernelException e ) + InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject ); + QuerySource sourceDetails = new QuerySource( "bolt-session", querySource ); + TransactionalContext transactionalContext = + contextFactory.newContext( sourceDetails, transaction, statement, params ); + + return new BoltResultHandle() { - throw new QueryExecutionKernelException( e ); - } - } + @Override + public BoltResult start() throws QueryExecutionKernelException + { + try + { + Result run = queryExecutionEngine.executeQuery( statement, params, transactionalContext ); + return new CypherAdapterStream( run, clock ); + } + catch ( KernelException e ) + { + throw new QueryExecutionKernelException( e ); + } + + } + @Override + public void terminate() + { + transactionalContext.terminate(); + } + }; + + } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java index dae7ef331d65e..baa9076d240c9 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java @@ -26,8 +26,8 @@ import java.util.Map; import org.neo4j.bolt.v1.messaging.BoltIOException; -import org.neo4j.bolt.v1.runtime.spi.Record; import org.neo4j.bolt.v1.runtime.spi.BoltResult; +import org.neo4j.bolt.v1.runtime.spi.Record; import org.neo4j.graphdb.ExecutionPlanDescription; import org.neo4j.graphdb.InputPosition; import org.neo4j.graphdb.Notification; diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java deleted file mode 100644 index da409d65f2675..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package org.neo4j.bolt.v1.runtime.cypher; - -import java.util.Map; - -import org.neo4j.bolt.v1.runtime.spi.StatementRunner; -import org.neo4j.graphdb.Result; -import org.neo4j.kernel.GraphDatabaseQueryService; -import org.neo4j.kernel.api.exceptions.KernelException; -import org.neo4j.kernel.api.security.AuthSubject; -import org.neo4j.kernel.impl.coreapi.InternalTransaction; -import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker; -import org.neo4j.kernel.impl.query.Neo4jTransactionalContextFactory; -import org.neo4j.kernel.impl.query.QueryExecutionEngine; -import org.neo4j.kernel.impl.query.QuerySource; -import org.neo4j.kernel.impl.query.TransactionalContext; -import org.neo4j.kernel.impl.query.TransactionalContextFactory; - -import static org.neo4j.kernel.api.KernelTransaction.Type.implicit; - -public class CypherStatementRunner implements StatementRunner -{ - private static final PropertyContainerLocker locker = new PropertyContainerLocker(); - - private final QueryExecutionEngine queryExecutionEngine; - private final TransactionalContextFactory contextFactory; - private final GraphDatabaseQueryService queryService; - - public CypherStatementRunner( QueryExecutionEngine queryExecutionEngine, GraphDatabaseQueryService queryService ) - { - this.queryExecutionEngine = queryExecutionEngine; - this.contextFactory = new Neo4jTransactionalContextFactory( queryService, locker ); - this.queryService = queryService; - } - - @Override - public Result run( - final String querySource, - final AuthSubject authSubject, - final String queryText, - final Map queryParameters - ) throws KernelException - { - InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject ); - TransactionalContext transactionalContext = - contextFactory.newContext( new QuerySource( "bolt-session", querySource ), transaction, queryText, - queryParameters); - return queryExecutionEngine.executeQuery( queryText, queryParameters, transactionalContext ); - } -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java index 84c0ae82d094e..6a03db9fad523 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java @@ -45,6 +45,5 @@ public void accept( Visitor visitor ) throws Exception @Override public void close() { - } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java deleted file mode 100644 index 32b2531e71775..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package org.neo4j.bolt.v1.runtime.spi; - -import org.neo4j.graphdb.Result; -import org.neo4j.kernel.api.exceptions.KernelException; -import org.neo4j.kernel.api.security.AuthSubject; - -import java.util.Map; - -/** - * A runtime handler can handle a textual input language, yielding results. Query engines are not expected to be - * thread safe, each worker thread will have one query engine instance. - */ -public interface StatementRunner -{ - Result run( String querySource, AuthSubject authSubject, String statement, Map params ) - throws KernelException; -} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java index b662bd3a52d0e..6138b7a5d8134 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java @@ -24,10 +24,10 @@ import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.graphdb.ExecutionPlanDescription; import org.neo4j.graphdb.InputPosition; -import org.neo4j.graphdb.Notification; import org.neo4j.graphdb.QueryStatistics; import org.neo4j.graphdb.Result; import org.neo4j.graphdb.impl.notification.NotificationCode; +import org.neo4j.kernel.impl.query.TransactionalContext; import java.time.Clock; import java.util.Arrays; @@ -77,6 +77,8 @@ public void shouldIncludeBasicMetadata() throws Throwable Clock clock = mock( Clock.class ); when( clock.millis() ).thenReturn( 0L, 1337L ); + + TransactionalContext tc = mock( TransactionalContext.class ); CypherAdapterStream stream = new CypherAdapterStream( result, clock ); // When @@ -114,6 +116,7 @@ public void shouldIncludePlanIfPresent() throws Throwable plan("Join", map( "arg1", 1 ), singletonList( "id1" ), plan("Scan", map( "arg2", 1 ), singletonList("id2")) ) ); + TransactionalContext tc = mock( TransactionalContext.class ); CypherAdapterStream stream = new CypherAdapterStream( result, Clock.systemUTC() ); // When @@ -137,6 +140,7 @@ public void shouldIncludeProfileIfPresent() throws Throwable plan( "Join", map( "arg1", 1 ), 2, 1, singletonList( "id1" ), plan( "Scan", map( "arg2", 1 ), 2, 1, singletonList( "id2" ) ) ) ); + TransactionalContext tc = mock( TransactionalContext.class ); CypherAdapterStream stream = new CypherAdapterStream( result, Clock.systemUTC() ); // When @@ -158,10 +162,11 @@ public void shouldIncludeNotificationsIfPresent() throws Throwable when( result.getQueryStatistics() ).thenReturn( queryStatistics ); when( result.getQueryExecutionType() ).thenReturn( query( READ_WRITE ) ); - when( result.getNotifications() ).thenReturn( Arrays.asList( + when( result.getNotifications() ).thenReturn( Arrays.asList( NotificationCode.INDEX_HINT_UNFULFILLABLE.notification( InputPosition.empty ), NotificationCode.PLANNER_UNSUPPORTED.notification( new InputPosition( 4, 5, 6 ) ) ) ); + TransactionalContext tc = mock( TransactionalContext.class ); CypherAdapterStream stream = new CypherAdapterStream( result, Clock.systemUTC() ); // When diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java index e73151dfba5fd..024b9f8a66c53 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java @@ -20,8 +20,20 @@ package org.neo4j.bolt.v1.runtime.integration; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Rule; import org.junit.Test; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.regex.Pattern; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import org.neo4j.bolt.testing.BoltResponseRecorder; import org.neo4j.bolt.v1.runtime.BoltConnectionFatality; import org.neo4j.bolt.v1.runtime.BoltStateMachine; @@ -30,12 +42,14 @@ import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.test.Barrier; +import org.neo4j.test.DoubleLatch; -import java.util.regex.Pattern; - +import static java.lang.String.format; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; import static org.neo4j.bolt.testing.BoltMatchers.failedWithStatus; import static org.neo4j.bolt.testing.BoltMatchers.succeeded; import static org.neo4j.bolt.testing.BoltMatchers.succeededWithMetadata; @@ -43,6 +57,7 @@ import static org.neo4j.bolt.testing.BoltMatchers.wasIgnored; import static org.neo4j.bolt.testing.NullResponseHandler.nullResponseHandler; + public class TransactionIT { private static final String USER_AGENT = "TransactionIT/0.0"; @@ -226,4 +241,129 @@ public void run() thread.join(); } + @Test + public void shouldTerminateQueriesEvenIfUsingPeriodicCommit() throws Exception + { + // Spawns a throttled HTTP server, runs a PERIODIC COMMIT that fetches data from this server, + // and checks that the query able to be terminated + + // We start with 3, because that is how many actors we have - + // 1. the http server + // 2. the running query + // 3. the one terminating 2 + final DoubleLatch latch = new DoubleLatch( 3, true ); + + // This is used to block the http server between the first and second batch + final Barrier.Control barrier = new Barrier.Control(); + + // Serve CSV via local web server, let Jetty find a random port for us + Server server = createHttpServer( latch, barrier, 20, 30 ); + server.start(); + int localPort = getLocalPort( server ); + + final BoltStateMachine[] machine = {null}; + + Thread thread = new Thread() + { + @Override + public void run() + { + try ( BoltStateMachine stateMachine = env.newMachine( "" ) ) + { + machine[0] = stateMachine; + stateMachine.init( USER_AGENT, emptyMap(), null ); + String query = format( "USING PERIODIC COMMIT 10 LOAD CSV FROM 'http://localhost:%d' AS line " + + "CREATE (n:A {id: line[0], square: line[1]}) " + + "WITH count(*) as number " + + "CREATE (n:ShouldNotExist)", + localPort ); + try + { + latch.start(); + stateMachine.run( query, emptyMap(), nullResponseHandler() ); + stateMachine.pullAll( nullResponseHandler() ); + } + finally + { + latch.finish(); + } + } + catch ( BoltConnectionFatality connectionFatality ) + { + throw new RuntimeException( connectionFatality ); + } + } + }; + thread.setName( "query runner" ); + thread.start(); + + // We block this thread here, waiting for the http server to spin up and the running query to get started + latch.startAndWaitForAllToStart(); + Thread.sleep( 1000 ); + + // This is the call that RESETs the Bolt connection and will terminate the running query + machine[0].reset( nullResponseHandler() ); + + barrier.release(); + + // We block again here, waiting for the running query to have been terminated, and for the server to have + // wrapped up and finished streaming http results + latch.finishAndWaitForAllToFinish(); + + // And now we check that the last node did not get created + try ( Transaction ignored = env.graph().beginTx() ) + { + assertFalse( "Query was not terminated in time - nodes were created!", + env.graph().findNodes( Label.label( "ShouldNotExist" ) ).hasNext() ); + } + } + + // TODO: This code is duplicated from BuiltInProceduresInteractionTestBase. + // Should probably be extracted to a common place + private Server createHttpServer( + DoubleLatch latch, Barrier.Control innerBarrier, + int firstBatchSize, int otherBatchSize ) + { + Server server = new Server( 0 ); + server.setHandler( new AbstractHandler() + { + @Override + public void handle( + String target, + Request baseRequest, + HttpServletRequest request, + HttpServletResponse response + ) throws IOException, ServletException + { + response.setContentType( "text/plain; charset=utf-8" ); + response.setStatus( HttpServletResponse.SC_OK ); + PrintWriter out = response.getWriter(); + + writeBatch( out, firstBatchSize ); + out.flush(); + latch.start(); + innerBarrier.reached(); + + latch.finish(); + writeBatch( out, otherBatchSize ); + baseRequest.setHandled(true); + } + + private void writeBatch( PrintWriter out, int batchSize ) + { + for ( int i = 0; i < batchSize; i++ ) + { + out.write( format( "%d %d\n", i, i*i ) ); + i++; + } + } + } ); + return server; + } + + private int getLocalPort( Server server ) + { + return ((ServerConnector) (server.getConnectors()[0])).getLocalPort(); + } + } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java index 4d6f759f91da4..f00ff50d5d383 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java @@ -22,6 +22,7 @@ import java.util.function.Supplier; import org.neo4j.graphdb.Lock; +import org.neo4j.graphdb.NotInTransactionException; import org.neo4j.graphdb.PropertyContainer; import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.api.ExecutingQuery; @@ -47,12 +48,11 @@ public class Neo4jTransactionalContext implements TransactionalContext private final Supplier statementSupplier; private final DbmsOperations.Factory dbmsOperationsFactory; private final Guard guard; + private final ExecutingQuery executingQuery; + private final PropertyContainerLocker locker; private InternalTransaction transaction; private Statement statement; - private ExecutingQuery executingQuery; - private PropertyContainerLocker locker; - private boolean isOpen = true; public Neo4jTransactionalContext( @@ -133,6 +133,24 @@ public void close( boolean success ) } } + @Override // TODO: Make the state of this class a state machine that is a single value and maybe CAS state + // transitions + public void terminate() + { + if ( isOpen ) + { + try + { + transaction.terminate(); + close( false ); + } + catch ( NotInTransactionException e ) + { + // Ok then. Nothing to do + } + } + } + @Override public void commitAndRestartTx() { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java index 160d7bc565bfc..674a0669629c1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java @@ -40,8 +40,19 @@ public interface TransactionalContext boolean isTopLevelTx(); + /** + * This should be called once the query is finished, either successfully or not. + * Should be called from the same thread the query was executing in. + * @param success signals if the underlying transaction should be committed or rolled back. + */ void close( boolean success ); + /** + * This is used to terminate a currently running query. Can be called from any thread. Will roll back the current + * transaction if it is still open. + */ + void terminate(); + void commitAndRestartTx(); void cleanForReuse(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java index 5548c2bce40e6..2ad7dcb1fb886 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java @@ -68,6 +68,12 @@ public void close( boolean success ) throw new UnsupportedOperationException( "fake test class" ); } + @Override + public void terminate() + { + throw new UnsupportedOperationException( "fake test class" ); + } + @Override public void commitAndRestartTx() {