Skip to content

Commit

Permalink
Make it possible to terminate all running queries through Bolt
Browse files Browse the repository at this point in the history
Before this change, PERIODIC COMMIT queries could not be
aborted, which was a pity, since they are specially prone to
long run-times.
  • Loading branch information
systay committed Sep 27, 2016
1 parent 5a6e4ba commit 5050009
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 172 deletions.
6 changes: 6 additions & 0 deletions community/bolt/pom.xml
Expand Up @@ -81,6 +81,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.neo4j</groupId>
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.time.Clock;

import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.v1.runtime.cypher.CypherStatementRunner;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
Expand Down Expand Up @@ -92,9 +91,10 @@ public void shutdown() throws Throwable
@Override
public BoltStateMachine newMachine( String connectionDescriptor, Runnable onClose, Clock clock )
{
final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, queryService );
TransactionStateMachine.SPI transactionSPI = new TransactionStateMachineSPI( gds, txBridge,
queryExecutionEngine, statementRunner, transactionIdStore );
queryExecutionEngine,
transactionIdStore,
queryService, clock );
BoltStateMachine.SPI boltSPI = new BoltStateMachineSPI( connectionDescriptor, usageData,
logging, authentication, connectionTracker, transactionSPI );
return new BoltStateMachine( boltSPI, onClose, Clock.systemUTC() );
Expand Down
Expand Up @@ -25,14 +25,12 @@

import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.v1.runtime.bookmarking.Bookmark;
import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream;
import org.neo4j.bolt.v1.runtime.cypher.StatementMetadata;
import org.neo4j.bolt.v1.runtime.cypher.StatementProcessor;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.bolt.v1.runtime.spi.BookmarkResult;
import org.neo4j.cypher.InvalidSemanticsException;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
Expand Down Expand Up @@ -102,8 +100,7 @@ public void streamResult( ThrowingConsumer<BoltResult, Exception> resultConsumer
@Override
public void reset() throws TransactionFailureException
{
state.rollbackTransaction( ctx );
state.closeResult( ctx );
state.terminateQueryAndRollbackTransaction( ctx );
state = State.AUTO_COMMIT;
}

Expand Down Expand Up @@ -174,20 +171,16 @@ else if ( statement.equalsIgnoreCase( ROLLBACK ) )
}
else if ( spi.isPeriodicCommit( statement ) )
{
Result result = executeQuery( ctx, spi, statement, params );

ctx.currentTransaction = spi.beginTransaction( ctx.authSubject );

ctx.currentResult = new CypherAdapterStream( result, ctx.clock );
ctx.currentTransaction = null;
ctx.currentResultHandle = executeQuery( ctx, spi, statement, params );
ctx.currentResult = ctx.currentResultHandle.start();
return AUTO_COMMIT;
}
else
{
ctx.currentTransaction = spi.beginTransaction( ctx.authSubject );

Result result = execute( ctx, spi, statement, params );

ctx.currentResult = new CypherAdapterStream( result, ctx.clock );
ctx.currentResultHandle = execute( ctx, spi, statement, params );
ctx.currentResult = ctx.currentResultHandle.start();
return AUTO_COMMIT;
}
}
Expand All @@ -196,8 +189,8 @@ else if ( spi.isPeriodicCommit( statement ) )
* In AUTO_COMMIT we must make sure to fail, close and set the current
* transaction to null.
*/
private Result execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params )
private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params )
throws TransactionFailureException, QueryExecutionKernelException
{
try
Expand Down Expand Up @@ -276,15 +269,15 @@ else if( spi.isPeriodicCommit( statement ) )
}
else
{
Result result = execute( ctx, spi, statement, params );

ctx.currentResult = new CypherAdapterStream( result, ctx.clock );
ctx.currentResultHandle = execute( ctx, spi, statement, params );
ctx.currentResult = ctx.currentResultHandle.start();
return EXPLICIT_TRANSACTION;
}
}

private Result execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params ) throws QueryExecutionKernelException
private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params )
throws QueryExecutionKernelException
{
try
{
Expand Down Expand Up @@ -317,36 +310,40 @@ abstract State run( MutableTransactionState ctx,
abstract void streamResult( MutableTransactionState ctx,
ThrowingConsumer<BoltResult, Exception> resultConsumer ) throws Exception;

void rollbackTransaction( MutableTransactionState ctx ) throws TransactionFailureException
void terminateQueryAndRollbackTransaction( MutableTransactionState ctx ) throws TransactionFailureException
{
if ( ctx.currentTransaction != null )
if ( ctx.currentResultHandle != null )
{
if ( ctx.currentTransaction.isOpen() )
{
ctx.currentTransaction.failure();
ctx.currentTransaction.close();
ctx.currentTransaction = null;
}
ctx.currentResultHandle.terminate();
ctx.currentResultHandle = null;
}
}

void closeResult( MutableTransactionState ctx )
{
if ( ctx.currentResult != null )
{
ctx.currentResult.close();
ctx.currentResult = null;
}
if ( ctx.currentTransaction != null && ctx.currentTransaction.isOpen() )
{
ctx.currentTransaction.failure();
ctx.currentTransaction.close();
ctx.currentTransaction = null;
}
}

}

private static Result executeQuery( MutableTransactionState ctx, SPI spi, String statement,
Map<String, Object> params ) throws QueryExecutionKernelException
private static BoltResultHandle executeQuery( MutableTransactionState ctx, SPI spi, String statement,
Map<String,Object> params )
throws QueryExecutionKernelException
{
return spi.executeQuery( ctx.querySource, ctx.authSubject, statement, params );
}

interface BoltResultHandle
{
BoltResult start() throws QueryExecutionKernelException;
void terminate();
}

static class MutableTransactionState
{
/** The current session auth state to be used for starting transactions */
Expand All @@ -371,6 +368,7 @@ public String[] fieldNames()
};

String querySource;
BoltResultHandle currentResultHandle;

private MutableTransactionState( AuthenticationResult authenticationResult, Clock clock )
{
Expand All @@ -393,7 +391,9 @@ interface SPI

boolean isPeriodicCommit( String query );

Result executeQuery( String querySource, AuthSubject authSubject, String statement, Map<String, Object> params )
throws QueryExecutionKernelException;
BoltResultHandle executeQuery( String querySource,
AuthSubject authSubject,
String statement,
Map<String,Object> params ) throws QueryExecutionKernelException;
}
}
Expand Up @@ -19,41 +19,59 @@
*/
package org.neo4j.bolt.v1.runtime;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;

import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle;
import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AuthSubject;
import org.neo4j.kernel.api.txtracking.TransactionIdTracker;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
import org.neo4j.kernel.impl.query.Neo4jTransactionalContextFactory;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
import org.neo4j.kernel.impl.query.QuerySource;
import org.neo4j.kernel.impl.query.TransactionalContext;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;

import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;

class TransactionStateMachineSPI implements TransactionStateMachine.SPI
{
private final GraphDatabaseAPI db;
private final ThreadToStatementContextBridge txBridge;
private final QueryExecutionEngine queryExecutionEngine;
private final StatementRunner statementRunner;
private final TransactionIdTracker transactionIdTracker;
private static final PropertyContainerLocker locker = new PropertyContainerLocker();
private final Neo4jTransactionalContextFactory contextFactory;
private final GraphDatabaseQueryService queryService;
private final Clock clock;

TransactionStateMachineSPI( GraphDatabaseAPI db,
ThreadToStatementContextBridge txBridge,
QueryExecutionEngine queryExecutionEngine,
StatementRunner statementRunner,
TransactionIdStore transactionIdStoreSupplier )
TransactionIdStore transactionIdStoreSupplier,
GraphDatabaseQueryService queryService,
Clock clock )
{
this.db = db;
this.txBridge = txBridge;
this.queryExecutionEngine = queryExecutionEngine;
this.statementRunner = statementRunner;
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier );
this.contextFactory = new Neo4jTransactionalContextFactory( queryService, locker );
this.queryService = queryService;

this.clock = clock;
}

@Override
Expand Down Expand Up @@ -94,19 +112,39 @@ public boolean isPeriodicCommit( String query )
}

@Override
public Result executeQuery( String querySource,
AuthSubject authSubject,
String statement,
Map<String, Object> params ) throws QueryExecutionKernelException
public BoltResultHandle executeQuery( String querySource,
AuthSubject authSubject,
String statement,
Map<String,Object> params ) throws QueryExecutionKernelException
{
try
{
return statementRunner.run( querySource, authSubject, statement, params );
}
catch ( KernelException e )
InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject );
QuerySource sourceDetails = new QuerySource( "bolt-session", querySource );
TransactionalContext transactionalContext =
contextFactory.newContext( sourceDetails, transaction, statement, params );

return new BoltResultHandle()
{
throw new QueryExecutionKernelException( e );
}
}
@Override
public BoltResult start() throws QueryExecutionKernelException
{
try
{
Result run = queryExecutionEngine.executeQuery( statement, params, transactionalContext );
return new CypherAdapterStream( run, clock );
}
catch ( KernelException e )
{
throw new QueryExecutionKernelException( e );
}

}

@Override
public void terminate()
{
transactionalContext.terminate();
}
};

}
}
Expand Up @@ -26,8 +26,8 @@
import java.util.Map;

import org.neo4j.bolt.v1.messaging.BoltIOException;
import org.neo4j.bolt.v1.runtime.spi.Record;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.bolt.v1.runtime.spi.Record;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.InputPosition;
import org.neo4j.graphdb.Notification;
Expand Down

This file was deleted.

Expand Up @@ -45,6 +45,5 @@ public void accept( Visitor visitor ) throws Exception
@Override
public void close()
{

}
}

0 comments on commit 5050009

Please sign in to comment.