Skip to content

Commit

Permalink
Make sure to fail and remove transaction when a query fails
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Sep 28, 2016
1 parent 5050009 commit b36da8c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 51 deletions.
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.bolt.v1.runtime.spi.BookmarkResult; import org.neo4j.bolt.v1.runtime.spi.BookmarkResult;
import org.neo4j.cypher.InvalidSemanticsException; import org.neo4j.cypher.InvalidSemanticsException;
import org.neo4j.function.ThrowingAction;
import org.neo4j.function.ThrowingConsumer; import org.neo4j.function.ThrowingConsumer;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.KernelException;
Expand Down Expand Up @@ -171,16 +172,19 @@ else if ( statement.equalsIgnoreCase( ROLLBACK ) )
} }
else if ( spi.isPeriodicCommit( statement ) ) else if ( spi.isPeriodicCommit( statement ) )
{ {
ctx.currentTransaction = null; BoltResultHandle resultHandle = executeQuery( ctx, spi, statement, params, () -> {} );
ctx.currentResultHandle = executeQuery( ctx, spi, statement, params ); ctx.currentResultHandle = resultHandle;
ctx.currentResult = ctx.currentResultHandle.start(); ctx.currentResult = resultHandle.start();
ctx.currentTransaction = null; // Periodic commit will change the current transaction, so
// we can't trust this to point to the actual current transaction;
return AUTO_COMMIT; return AUTO_COMMIT;
} }
else else
{ {
ctx.currentTransaction = spi.beginTransaction( ctx.authSubject ); ctx.currentTransaction = spi.beginTransaction( ctx.authSubject );
ctx.currentResultHandle = execute( ctx, spi, statement, params ); BoltResultHandle resultHandle = execute( ctx, spi, statement, params );
ctx.currentResult = ctx.currentResultHandle.start(); ctx.currentResultHandle = resultHandle;
ctx.currentResult = resultHandle.start();
return AUTO_COMMIT; return AUTO_COMMIT;
} }
} }
Expand All @@ -190,29 +194,21 @@ else if ( spi.isPeriodicCommit( statement ) )
* transaction to null. * transaction to null.
*/ */
private BoltResultHandle execute( MutableTransactionState ctx, SPI spi, private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params ) String statement, Map<String,Object> params )
throws TransactionFailureException, QueryExecutionKernelException throws TransactionFailureException, QueryExecutionKernelException
{ {
try return executeQuery( ctx, spi, statement, params, () ->
{ {
return executeQuery( ctx, spi, statement, params ); try // On fail
}
catch ( Throwable e )
{
if (ctx.currentTransaction != null)
{ {
try ctx.currentTransaction.failure();
{ ctx.currentTransaction.close();
ctx.currentTransaction.failure();
ctx.currentTransaction.close();
}
finally
{
ctx.currentTransaction = null;
}
} }
throw e; finally
} {
ctx.currentTransaction = null;
}
} );
} }


@Override @Override
Expand Down Expand Up @@ -276,21 +272,17 @@ else if( spi.isPeriodicCommit( statement ) )
} }


private BoltResultHandle execute( MutableTransactionState ctx, SPI spi, private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params ) String statement, Map<String,Object> params )
throws QueryExecutionKernelException throws QueryExecutionKernelException
{ {
try return executeQuery( ctx, spi, statement, params,
{ () ->
return executeQuery( ctx, spi, statement, params ); {
} if ( ctx.currentTransaction != null )
catch ( Throwable e ) {
{ ctx.currentTransaction.failure();
if (ctx.currentTransaction != null) }
{ } );
ctx.currentTransaction.failure();
}
throw e;
}
} }


@Override @Override
Expand Down Expand Up @@ -332,15 +324,20 @@ void terminateQueryAndRollbackTransaction( MutableTransactionState ctx ) throws
} }


private static BoltResultHandle executeQuery( MutableTransactionState ctx, SPI spi, String statement, private static BoltResultHandle executeQuery( MutableTransactionState ctx, SPI spi, String statement,
Map<String,Object> params ) Map<String,Object> params, ThrowingAction<KernelException> onFail )
throws QueryExecutionKernelException throws QueryExecutionKernelException
{ {
return spi.executeQuery( ctx.querySource, ctx.authSubject, statement, params ); return spi.executeQuery( ctx.querySource, ctx.authSubject, statement, params, onFail );
} }


/**
* This interface makes it possible to abort queries even before they have returned a Result object.
* In some cases, creating the Result object will take as long as running the query takes. This way, we can
* terminate the underlying transaction while the Result object is created.
*/
interface BoltResultHandle interface BoltResultHandle
{ {
BoltResult start() throws QueryExecutionKernelException; BoltResult start() throws KernelException;
void terminate(); void terminate();
} }


Expand Down Expand Up @@ -392,8 +389,9 @@ interface SPI
boolean isPeriodicCommit( String query ); boolean isPeriodicCommit( String query );


BoltResultHandle executeQuery( String querySource, BoltResultHandle executeQuery( String querySource,
AuthSubject authSubject, AuthSubject authSubject,
String statement, String statement,
Map<String,Object> params ) throws QueryExecutionKernelException; Map<String,Object> params,
ThrowingAction<KernelException> onFail ) throws QueryExecutionKernelException;
} }
} }
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle; import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle;
import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream; import org.neo4j.bolt.v1.runtime.cypher.CypherAdapterStream;
import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.function.ThrowingAction;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
Expand Down Expand Up @@ -113,19 +114,19 @@ public boolean isPeriodicCommit( String query )


@Override @Override
public BoltResultHandle executeQuery( String querySource, public BoltResultHandle executeQuery( String querySource,
AuthSubject authSubject, AuthSubject authSubject,
String statement, String statement,
Map<String,Object> params ) throws QueryExecutionKernelException Map<String,Object> params, ThrowingAction<KernelException> onFail ) throws QueryExecutionKernelException
{ {
InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject ); InternalTransaction internalTransaction = queryService.beginTransaction( implicit, authSubject );
QuerySource sourceDetails = new QuerySource( "bolt-session", querySource ); QuerySource sourceDetails = new QuerySource( "bolt-session", querySource );
TransactionalContext transactionalContext = TransactionalContext transactionalContext =
contextFactory.newContext( sourceDetails, transaction, statement, params ); contextFactory.newContext( sourceDetails, internalTransaction, statement, params );


return new BoltResultHandle() return new BoltResultHandle()
{ {
@Override @Override
public BoltResult start() throws QueryExecutionKernelException public BoltResult start() throws KernelException
{ {
try try
{ {
Expand All @@ -134,9 +135,14 @@ public BoltResult start() throws QueryExecutionKernelException
} }
catch ( KernelException e ) catch ( KernelException e )
{ {
onFail.apply();
throw new QueryExecutionKernelException( e ); throw new QueryExecutionKernelException( e );
} }

catch ( Throwable e )
{
onFail.apply();
throw e;
}
} }


@Override @Override
Expand All @@ -145,6 +151,5 @@ public void terminate()
transactionalContext.terminate(); transactionalContext.terminate();
} }
}; };

}
}
} }

0 comments on commit b36da8c

Please sign in to comment.