diff --git a/community/bolt/pom.xml b/community/bolt/pom.xml
index 8759e57f2efc0..7443bb7c06863 100644
--- a/community/bolt/pom.xml
+++ b/community/bolt/pom.xml
@@ -81,6 +81,12 @@
mockito-core
test
+
+ org.eclipse.jetty
+ jetty-server
+ test
+
+
org.neo4j
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java
index a130d0341c880..33750211cc854 100644
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java
+++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/LifecycleManagedBoltFactory.java
@@ -22,7 +22,6 @@
import java.time.Clock;
import org.neo4j.bolt.security.auth.Authentication;
-import org.neo4j.bolt.v1.runtime.cypher.CypherStatementRunner;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
@@ -92,9 +91,10 @@ public void shutdown() throws Throwable
@Override
public BoltStateMachine newMachine( String connectionDescriptor, Runnable onClose, Clock clock )
{
- final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, queryService );
TransactionStateMachine.SPI transactionSPI = new TransactionStateMachineSPI( gds, txBridge,
- queryExecutionEngine, statementRunner, transactionIdStore );
+ queryExecutionEngine,
+ transactionIdStore,
+ queryService, clock );
BoltStateMachine.SPI boltSPI = new BoltStateMachineSPI( connectionDescriptor, usageData,
logging, authentication, connectionTracker, transactionSPI );
return new BoltStateMachine( boltSPI, onClose, Clock.systemUTC() );
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java
index 241004fcaa604..100c68c0389f8 100644
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java
+++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java
@@ -25,14 +25,12 @@
import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.v1.runtime.bookmarking.Bookmark;
-import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream;
import org.neo4j.bolt.v1.runtime.cypher.StatementMetadata;
import org.neo4j.bolt.v1.runtime.cypher.StatementProcessor;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.bolt.v1.runtime.spi.BookmarkResult;
import org.neo4j.cypher.InvalidSemanticsException;
import org.neo4j.function.ThrowingConsumer;
-import org.neo4j.graphdb.Result;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
@@ -102,8 +100,7 @@ public void streamResult( ThrowingConsumer resultConsumer
@Override
public void reset() throws TransactionFailureException
{
- state.rollbackTransaction( ctx );
- state.closeResult( ctx );
+ state.terminateQueryAndRollbackTransaction( ctx );
state = State.AUTO_COMMIT;
}
@@ -174,20 +171,16 @@ else if ( statement.equalsIgnoreCase( ROLLBACK ) )
}
else if ( spi.isPeriodicCommit( statement ) )
{
- Result result = executeQuery( ctx, spi, statement, params );
-
- ctx.currentTransaction = spi.beginTransaction( ctx.authSubject );
-
- ctx.currentResult = new CypherAdapterStream( result, ctx.clock );
+ ctx.currentTransaction = null;
+ ctx.currentResultHandle = executeQuery( ctx, spi, statement, params );
+ ctx.currentResult = ctx.currentResultHandle.start();
return AUTO_COMMIT;
}
else
{
ctx.currentTransaction = spi.beginTransaction( ctx.authSubject );
-
- Result result = execute( ctx, spi, statement, params );
-
- ctx.currentResult = new CypherAdapterStream( result, ctx.clock );
+ ctx.currentResultHandle = execute( ctx, spi, statement, params );
+ ctx.currentResult = ctx.currentResultHandle.start();
return AUTO_COMMIT;
}
}
@@ -196,8 +189,8 @@ else if ( spi.isPeriodicCommit( statement ) )
* In AUTO_COMMIT we must make sure to fail, close and set the current
* transaction to null.
*/
- private Result execute( MutableTransactionState ctx, SPI spi,
- String statement, Map params )
+ private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
+ String statement, Map params )
throws TransactionFailureException, QueryExecutionKernelException
{
try
@@ -276,15 +269,15 @@ else if( spi.isPeriodicCommit( statement ) )
}
else
{
- Result result = execute( ctx, spi, statement, params );
-
- ctx.currentResult = new CypherAdapterStream( result, ctx.clock );
+ ctx.currentResultHandle = execute( ctx, spi, statement, params );
+ ctx.currentResult = ctx.currentResultHandle.start();
return EXPLICIT_TRANSACTION;
}
}
- private Result execute( MutableTransactionState ctx, SPI spi,
- String statement, Map params ) throws QueryExecutionKernelException
+ private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
+ String statement, Map params )
+ throws QueryExecutionKernelException
{
try
{
@@ -317,36 +310,40 @@ abstract State run( MutableTransactionState ctx,
abstract void streamResult( MutableTransactionState ctx,
ThrowingConsumer resultConsumer ) throws Exception;
- void rollbackTransaction( MutableTransactionState ctx ) throws TransactionFailureException
+ void terminateQueryAndRollbackTransaction( MutableTransactionState ctx ) throws TransactionFailureException
{
- if ( ctx.currentTransaction != null )
+ if ( ctx.currentResultHandle != null )
{
- if ( ctx.currentTransaction.isOpen() )
- {
- ctx.currentTransaction.failure();
- ctx.currentTransaction.close();
- ctx.currentTransaction = null;
- }
+ ctx.currentResultHandle.terminate();
+ ctx.currentResultHandle = null;
}
- }
-
- void closeResult( MutableTransactionState ctx )
- {
if ( ctx.currentResult != null )
{
ctx.currentResult.close();
ctx.currentResult = null;
}
+ if ( ctx.currentTransaction != null && ctx.currentTransaction.isOpen() )
+ {
+ ctx.currentTransaction.failure();
+ ctx.currentTransaction.close();
+ ctx.currentTransaction = null;
+ }
}
-
}
- private static Result executeQuery( MutableTransactionState ctx, SPI spi, String statement,
- Map params ) throws QueryExecutionKernelException
+ private static BoltResultHandle executeQuery( MutableTransactionState ctx, SPI spi, String statement,
+ Map params )
+ throws QueryExecutionKernelException
{
return spi.executeQuery( ctx.querySource, ctx.authSubject, statement, params );
}
+ interface BoltResultHandle
+ {
+ BoltResult start() throws QueryExecutionKernelException;
+ void terminate();
+ }
+
static class MutableTransactionState
{
/** The current session auth state to be used for starting transactions */
@@ -371,6 +368,7 @@ public String[] fieldNames()
};
String querySource;
+ BoltResultHandle currentResultHandle;
private MutableTransactionState( AuthenticationResult authenticationResult, Clock clock )
{
@@ -393,7 +391,9 @@ interface SPI
boolean isPeriodicCommit( String query );
- Result executeQuery( String querySource, AuthSubject authSubject, String statement, Map params )
- throws QueryExecutionKernelException;
+ BoltResultHandle executeQuery( String querySource,
+ AuthSubject authSubject,
+ String statement,
+ Map params ) throws QueryExecutionKernelException;
}
}
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java
index c9eabbb9919e8..d43270f7807d3 100644
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java
+++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachineSPI.java
@@ -19,41 +19,59 @@
*/
package org.neo4j.bolt.v1.runtime;
+import java.time.Clock;
import java.time.Duration;
import java.util.Map;
-import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
+import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle;
+import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream;
+import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.graphdb.Result;
+import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AuthSubject;
import org.neo4j.kernel.api.txtracking.TransactionIdTracker;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
+import org.neo4j.kernel.impl.coreapi.InternalTransaction;
+import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
+import org.neo4j.kernel.impl.query.Neo4jTransactionalContextFactory;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
+import org.neo4j.kernel.impl.query.QuerySource;
+import org.neo4j.kernel.impl.query.TransactionalContext;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
+import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;
+
class TransactionStateMachineSPI implements TransactionStateMachine.SPI
{
private final GraphDatabaseAPI db;
private final ThreadToStatementContextBridge txBridge;
private final QueryExecutionEngine queryExecutionEngine;
- private final StatementRunner statementRunner;
private final TransactionIdTracker transactionIdTracker;
+ private static final PropertyContainerLocker locker = new PropertyContainerLocker();
+ private final Neo4jTransactionalContextFactory contextFactory;
+ private final GraphDatabaseQueryService queryService;
+ private final Clock clock;
TransactionStateMachineSPI( GraphDatabaseAPI db,
ThreadToStatementContextBridge txBridge,
QueryExecutionEngine queryExecutionEngine,
- StatementRunner statementRunner,
- TransactionIdStore transactionIdStoreSupplier )
+ TransactionIdStore transactionIdStoreSupplier,
+ GraphDatabaseQueryService queryService,
+ Clock clock )
{
this.db = db;
this.txBridge = txBridge;
this.queryExecutionEngine = queryExecutionEngine;
- this.statementRunner = statementRunner;
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier );
+ this.contextFactory = new Neo4jTransactionalContextFactory( queryService, locker );
+ this.queryService = queryService;
+
+ this.clock = clock;
}
@Override
@@ -94,19 +112,39 @@ public boolean isPeriodicCommit( String query )
}
@Override
- public Result executeQuery( String querySource,
- AuthSubject authSubject,
- String statement,
- Map params ) throws QueryExecutionKernelException
+ public BoltResultHandle executeQuery( String querySource,
+ AuthSubject authSubject,
+ String statement,
+ Map params ) throws QueryExecutionKernelException
{
- try
- {
- return statementRunner.run( querySource, authSubject, statement, params );
- }
- catch ( KernelException e )
+ InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject );
+ QuerySource sourceDetails = new QuerySource( "bolt-session", querySource );
+ TransactionalContext transactionalContext =
+ contextFactory.newContext( sourceDetails, transaction, statement, params );
+
+ return new BoltResultHandle()
{
- throw new QueryExecutionKernelException( e );
- }
- }
+ @Override
+ public BoltResult start() throws QueryExecutionKernelException
+ {
+ try
+ {
+ Result run = queryExecutionEngine.executeQuery( statement, params, transactionalContext );
+ return new CypherAdapterStream( run, clock );
+ }
+ catch ( KernelException e )
+ {
+ throw new QueryExecutionKernelException( e );
+ }
+
+ }
+ @Override
+ public void terminate()
+ {
+ transactionalContext.terminate();
+ }
+ };
+
+ }
}
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java
index dae7ef331d65e..baa9076d240c9 100644
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java
+++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStream.java
@@ -26,8 +26,8 @@
import java.util.Map;
import org.neo4j.bolt.v1.messaging.BoltIOException;
-import org.neo4j.bolt.v1.runtime.spi.Record;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
+import org.neo4j.bolt.v1.runtime.spi.Record;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.InputPosition;
import org.neo4j.graphdb.Notification;
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java
deleted file mode 100644
index da409d65f2675..0000000000000
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/cypher/CypherStatementRunner.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package org.neo4j.bolt.v1.runtime.cypher;
-
-import java.util.Map;
-
-import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
-import org.neo4j.graphdb.Result;
-import org.neo4j.kernel.GraphDatabaseQueryService;
-import org.neo4j.kernel.api.exceptions.KernelException;
-import org.neo4j.kernel.api.security.AuthSubject;
-import org.neo4j.kernel.impl.coreapi.InternalTransaction;
-import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
-import org.neo4j.kernel.impl.query.Neo4jTransactionalContextFactory;
-import org.neo4j.kernel.impl.query.QueryExecutionEngine;
-import org.neo4j.kernel.impl.query.QuerySource;
-import org.neo4j.kernel.impl.query.TransactionalContext;
-import org.neo4j.kernel.impl.query.TransactionalContextFactory;
-
-import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;
-
-public class CypherStatementRunner implements StatementRunner
-{
- private static final PropertyContainerLocker locker = new PropertyContainerLocker();
-
- private final QueryExecutionEngine queryExecutionEngine;
- private final TransactionalContextFactory contextFactory;
- private final GraphDatabaseQueryService queryService;
-
- public CypherStatementRunner( QueryExecutionEngine queryExecutionEngine, GraphDatabaseQueryService queryService )
- {
- this.queryExecutionEngine = queryExecutionEngine;
- this.contextFactory = new Neo4jTransactionalContextFactory( queryService, locker );
- this.queryService = queryService;
- }
-
- @Override
- public Result run(
- final String querySource,
- final AuthSubject authSubject,
- final String queryText,
- final Map queryParameters
- ) throws KernelException
- {
- InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject );
- TransactionalContext transactionalContext =
- contextFactory.newContext( new QuerySource( "bolt-session", querySource ), transaction, queryText,
- queryParameters);
- return queryExecutionEngine.executeQuery( queryText, queryParameters, transactionalContext );
- }
-}
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java
index 84c0ae82d094e..6a03db9fad523 100644
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java
+++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/BookmarkResult.java
@@ -45,6 +45,5 @@ public void accept( Visitor visitor ) throws Exception
@Override
public void close()
{
-
}
}
diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java
deleted file mode 100644
index 32b2531e71775..0000000000000
--- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/spi/StatementRunner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package org.neo4j.bolt.v1.runtime.spi;
-
-import org.neo4j.graphdb.Result;
-import org.neo4j.kernel.api.exceptions.KernelException;
-import org.neo4j.kernel.api.security.AuthSubject;
-
-import java.util.Map;
-
-/**
- * A runtime handler can handle a textual input language, yielding results. Query engines are not expected to be
- * thread safe, each worker thread will have one query engine instance.
- */
-public interface StatementRunner
-{
- Result run( String querySource, AuthSubject authSubject, String statement, Map params )
- throws KernelException;
-}
diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java
index b662bd3a52d0e..6138b7a5d8134 100644
--- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java
+++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/cypher/CypherAdapterStreamTest.java
@@ -24,10 +24,10 @@
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.InputPosition;
-import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.impl.notification.NotificationCode;
+import org.neo4j.kernel.impl.query.TransactionalContext;
import java.time.Clock;
import java.util.Arrays;
@@ -77,6 +77,8 @@ public void shouldIncludeBasicMetadata() throws Throwable
Clock clock = mock( Clock.class );
when( clock.millis() ).thenReturn( 0L, 1337L );
+
+ TransactionalContext tc = mock( TransactionalContext.class );
CypherAdapterStream stream = new CypherAdapterStream( result, clock );
// When
@@ -114,6 +116,7 @@ public void shouldIncludePlanIfPresent() throws Throwable
plan("Join", map( "arg1", 1 ), singletonList( "id1" ),
plan("Scan", map( "arg2", 1 ), singletonList("id2")) ) );
+ TransactionalContext tc = mock( TransactionalContext.class );
CypherAdapterStream stream = new CypherAdapterStream( result, Clock.systemUTC() );
// When
@@ -137,6 +140,7 @@ public void shouldIncludeProfileIfPresent() throws Throwable
plan( "Join", map( "arg1", 1 ), 2, 1, singletonList( "id1" ),
plan( "Scan", map( "arg2", 1 ), 2, 1, singletonList( "id2" ) ) ) );
+ TransactionalContext tc = mock( TransactionalContext.class );
CypherAdapterStream stream = new CypherAdapterStream( result, Clock.systemUTC() );
// When
@@ -158,10 +162,11 @@ public void shouldIncludeNotificationsIfPresent() throws Throwable
when( result.getQueryStatistics() ).thenReturn( queryStatistics );
when( result.getQueryExecutionType() ).thenReturn( query( READ_WRITE ) );
- when( result.getNotifications() ).thenReturn( Arrays.asList(
+ when( result.getNotifications() ).thenReturn( Arrays.asList(
NotificationCode.INDEX_HINT_UNFULFILLABLE.notification( InputPosition.empty ),
NotificationCode.PLANNER_UNSUPPORTED.notification( new InputPosition( 4, 5, 6 ) )
) );
+ TransactionalContext tc = mock( TransactionalContext.class );
CypherAdapterStream stream = new CypherAdapterStream( result, Clock.systemUTC() );
// When
diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java
index e73151dfba5fd..024b9f8a66c53 100644
--- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java
+++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/TransactionIT.java
@@ -20,8 +20,20 @@
package org.neo4j.bolt.v1.runtime.integration;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Rule;
import org.junit.Test;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.regex.Pattern;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.neo4j.bolt.testing.BoltResponseRecorder;
import org.neo4j.bolt.v1.runtime.BoltConnectionFatality;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
@@ -30,12 +42,14 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.exceptions.Status;
+import org.neo4j.test.Barrier;
+import org.neo4j.test.DoubleLatch;
-import java.util.regex.Pattern;
-
+import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
import static org.neo4j.bolt.testing.BoltMatchers.failedWithStatus;
import static org.neo4j.bolt.testing.BoltMatchers.succeeded;
import static org.neo4j.bolt.testing.BoltMatchers.succeededWithMetadata;
@@ -43,6 +57,7 @@
import static org.neo4j.bolt.testing.BoltMatchers.wasIgnored;
import static org.neo4j.bolt.testing.NullResponseHandler.nullResponseHandler;
+
public class TransactionIT
{
private static final String USER_AGENT = "TransactionIT/0.0";
@@ -226,4 +241,129 @@ public void run()
thread.join();
}
+ @Test
+ public void shouldTerminateQueriesEvenIfUsingPeriodicCommit() throws Exception
+ {
+ // Spawns a throttled HTTP server, runs a PERIODIC COMMIT that fetches data from this server,
+ // and checks that the query able to be terminated
+
+ // We start with 3, because that is how many actors we have -
+ // 1. the http server
+ // 2. the running query
+ // 3. the one terminating 2
+ final DoubleLatch latch = new DoubleLatch( 3, true );
+
+ // This is used to block the http server between the first and second batch
+ final Barrier.Control barrier = new Barrier.Control();
+
+ // Serve CSV via local web server, let Jetty find a random port for us
+ Server server = createHttpServer( latch, barrier, 20, 30 );
+ server.start();
+ int localPort = getLocalPort( server );
+
+ final BoltStateMachine[] machine = {null};
+
+ Thread thread = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try ( BoltStateMachine stateMachine = env.newMachine( "" ) )
+ {
+ machine[0] = stateMachine;
+ stateMachine.init( USER_AGENT, emptyMap(), null );
+ String query = format( "USING PERIODIC COMMIT 10 LOAD CSV FROM 'http://localhost:%d' AS line " +
+ "CREATE (n:A {id: line[0], square: line[1]}) " +
+ "WITH count(*) as number " +
+ "CREATE (n:ShouldNotExist)",
+ localPort );
+ try
+ {
+ latch.start();
+ stateMachine.run( query, emptyMap(), nullResponseHandler() );
+ stateMachine.pullAll( nullResponseHandler() );
+ }
+ finally
+ {
+ latch.finish();
+ }
+ }
+ catch ( BoltConnectionFatality connectionFatality )
+ {
+ throw new RuntimeException( connectionFatality );
+ }
+ }
+ };
+ thread.setName( "query runner" );
+ thread.start();
+
+ // We block this thread here, waiting for the http server to spin up and the running query to get started
+ latch.startAndWaitForAllToStart();
+ Thread.sleep( 1000 );
+
+ // This is the call that RESETs the Bolt connection and will terminate the running query
+ machine[0].reset( nullResponseHandler() );
+
+ barrier.release();
+
+ // We block again here, waiting for the running query to have been terminated, and for the server to have
+ // wrapped up and finished streaming http results
+ latch.finishAndWaitForAllToFinish();
+
+ // And now we check that the last node did not get created
+ try ( Transaction ignored = env.graph().beginTx() )
+ {
+ assertFalse( "Query was not terminated in time - nodes were created!",
+ env.graph().findNodes( Label.label( "ShouldNotExist" ) ).hasNext() );
+ }
+ }
+
+ // TODO: This code is duplicated from BuiltInProceduresInteractionTestBase.
+ // Should probably be extracted to a common place
+ private Server createHttpServer(
+ DoubleLatch latch, Barrier.Control innerBarrier,
+ int firstBatchSize, int otherBatchSize )
+ {
+ Server server = new Server( 0 );
+ server.setHandler( new AbstractHandler()
+ {
+ @Override
+ public void handle(
+ String target,
+ Request baseRequest,
+ HttpServletRequest request,
+ HttpServletResponse response
+ ) throws IOException, ServletException
+ {
+ response.setContentType( "text/plain; charset=utf-8" );
+ response.setStatus( HttpServletResponse.SC_OK );
+ PrintWriter out = response.getWriter();
+
+ writeBatch( out, firstBatchSize );
+ out.flush();
+ latch.start();
+ innerBarrier.reached();
+
+ latch.finish();
+ writeBatch( out, otherBatchSize );
+ baseRequest.setHandled(true);
+ }
+
+ private void writeBatch( PrintWriter out, int batchSize )
+ {
+ for ( int i = 0; i < batchSize; i++ )
+ {
+ out.write( format( "%d %d\n", i, i*i ) );
+ i++;
+ }
+ }
+ } );
+ return server;
+ }
+
+ private int getLocalPort( Server server )
+ {
+ return ((ServerConnector) (server.getConnectors()[0])).getLocalPort();
+ }
+
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java
index 4d6f759f91da4..f00ff50d5d383 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/Neo4jTransactionalContext.java
@@ -22,6 +22,7 @@
import java.util.function.Supplier;
import org.neo4j.graphdb.Lock;
+import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.ExecutingQuery;
@@ -47,12 +48,11 @@ public class Neo4jTransactionalContext implements TransactionalContext
private final Supplier statementSupplier;
private final DbmsOperations.Factory dbmsOperationsFactory;
private final Guard guard;
+ private final ExecutingQuery executingQuery;
+ private final PropertyContainerLocker locker;
private InternalTransaction transaction;
private Statement statement;
- private ExecutingQuery executingQuery;
- private PropertyContainerLocker locker;
-
private boolean isOpen = true;
public Neo4jTransactionalContext(
@@ -133,6 +133,24 @@ public void close( boolean success )
}
}
+ @Override // TODO: Make the state of this class a state machine that is a single value and maybe CAS state
+ // transitions
+ public void terminate()
+ {
+ if ( isOpen )
+ {
+ try
+ {
+ transaction.terminate();
+ close( false );
+ }
+ catch ( NotInTransactionException e )
+ {
+ // Ok then. Nothing to do
+ }
+ }
+ }
+
@Override
public void commitAndRestartTx()
{
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java
index 160d7bc565bfc..674a0669629c1 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/query/TransactionalContext.java
@@ -40,8 +40,19 @@ public interface TransactionalContext
boolean isTopLevelTx();
+ /**
+ * This should be called once the query is finished, either successfully or not.
+ * Should be called from the same thread the query was executing in.
+ * @param success signals if the underlying transaction should be committed or rolled back.
+ */
void close( boolean success );
+ /**
+ * This is used to terminate a currently running query. Can be called from any thread. Will roll back the current
+ * transaction if it is still open.
+ */
+ void terminate();
+
void commitAndRestartTx();
void cleanForReuse();
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java
index 5548c2bce40e6..2ad7dcb1fb886 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/query/FakeTransactionalContext.java
@@ -68,6 +68,12 @@ public void close( boolean success )
throw new UnsupportedOperationException( "fake test class" );
}
+ @Override
+ public void terminate()
+ {
+ throw new UnsupportedOperationException( "fake test class" );
+ }
+
@Override
public void commitAndRestartTx()
{