Skip to content

Commit

Permalink
Introuduce SPI for SSM to make testing it easier
Browse files Browse the repository at this point in the history
  • Loading branch information
jakewins committed Mar 9, 2016
1 parent b85dad3 commit 4385b09
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 42 deletions.
Expand Up @@ -28,10 +28,11 @@
import org.neo4j.bolt.v1.runtime.StatementMetadata;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.concurrent.DecayingFlags;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
Expand All @@ -40,9 +41,11 @@
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.Neo4jTransactionalContext;
import org.neo4j.kernel.impl.query.QuerySession;
import org.neo4j.logging.Log;
import org.neo4j.udc.UsageData;
import org.neo4j.udc.UsageDataKeys;

import static org.neo4j.kernel.api.AccessMode.FULL;
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;
import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;

/**
* State-machine based implementation of {@link Session}. With this approach,
Expand All @@ -67,8 +70,8 @@ public State init( SessionStateMachine ctx, String clientName, Map<String,Object
{
try
{
ctx.authentication.authenticate( authToken );
ctx.usageData.get( UsageDataKeys.clientNames ).add( clientName );
ctx.spi.authenticate( authToken );
ctx.spi.udcRegisterClient( clientName );
return IDLE;
}
catch ( AuthenticationException e )
Expand All @@ -95,8 +98,7 @@ protected State onNoImplementation( SessionStateMachine ctx, String command )
public State beginTransaction( SessionStateMachine ctx )
{
assert ctx.currentTransaction == null;
ctx.db.beginTransaction( KernelTransaction.Type.explicit, AccessMode.FULL );
ctx.currentTransaction = ctx.txBridge.getKernelTransactionBoundToThisThread( false );
ctx.currentTransaction = ctx.spi.beginTransaction( explicit, FULL );
return IN_TRANSACTION;
}

Expand All @@ -105,8 +107,7 @@ public State runStatement( SessionStateMachine ctx, String statement, Map<String
{
try
{
ctx.featureUsage.flag( UsageDataKeys.Features.bolt );
ctx.currentResult = ctx.statementRunner.run( ctx, statement, params );
ctx.currentResult = ctx.spi.run( ctx, statement, params );
ctx.result( ctx.currentStatementMetadata );
//if the call to run failed we must remain in state ERROR
if ( ctx.state == ERROR )
Expand All @@ -128,8 +129,7 @@ public State runStatement( SessionStateMachine ctx, String statement, Map<String
public State beginImplicitTransaction( SessionStateMachine ctx )
{
assert ctx.currentTransaction == null;
ctx.db.beginTransaction( KernelTransaction.Type.implicit, AccessMode.FULL );
ctx.currentTransaction = ctx.txBridge.getKernelTransactionBoundToThisThread( false );
ctx.currentTransaction = ctx.spi.beginTransaction( implicit, FULL );
return IN_TRANSACTION;
}

Expand Down Expand Up @@ -235,7 +235,7 @@ public State discardAll( SessionStateMachine ctx )
{
return IDLE;
}
else if ( ctx.currentTransaction.transactionType() == KernelTransaction.Type.implicit )
else if ( ctx.currentTransaction.transactionType() == implicit )
{
return IN_TRANSACTION.commitTransaction( ctx );
}
Expand Down Expand Up @@ -395,14 +395,14 @@ State error( SessionStateMachine ctx, Throwable err )

State error( SessionStateMachine ctx, Neo4jError err )
{
ctx.errorReporter.report( err );
ctx.spi.reportError( err );
State outcome = ERROR;
if ( ctx.hasTransaction() )
{
// Is this error bad enough that we should roll back, or did the failure occur in an implicit
// transaction?
if ( err.status().code().classification().rollbackTransaction() ||
ctx.currentTransaction.transactionType() == KernelTransaction.Type.implicit )
if( err.status().code().classification().rollbackTransaction() ||
ctx.currentTransaction.transactionType() == implicit )
{
try
{
Expand All @@ -411,8 +411,8 @@ State error( SessionStateMachine ctx, Neo4jError err )
}
catch ( Throwable t )
{
ctx.log.error( "While handling '" + err.status() + "', a second failure occurred when " +
"rolling back transaction: " + t.getMessage(), t );
ctx.spi.reportError( "While handling '" + err.status() + "', a second failure occurred when " +
"rolling back transaction: " + t.getMessage(), t );
}
finally
{
Expand All @@ -434,14 +434,7 @@ State error( SessionStateMachine ctx, Neo4jError err )
}
}

private final UsageData usageData;
private final DecayingFlags featureUsage;
private final GraphDatabaseFacade db;
private final StatementRunner statementRunner;
private final ErrorReporter errorReporter;
private final Log log;
private final String id;
private final Authentication authentication;
private final String id = UUID.randomUUID().toString();

/** A re-usable statement metadata instance that always represents the currently running statement */
private final StatementMetadata currentStatementMetadata = new StatementMetadata()
Expand All @@ -468,22 +461,46 @@ public String[] fieldNames()
/** Callback attachment */
private Object currentAttachment;

private ThreadToStatementContextBridge txBridge;
/** These are the "external" actions the state machine can take */
private final SPI spi;

/**
* This SPI encapsulates the "external" actions the state machine can take.
* It exists for three reasons:
* 1) It makes it very clear what side-effects the SSM can have
* 2) It decouples the SSM from the actual components performing these operations
* 3) It makes it *much* easier to test the SSM without having to re-implement
* the whole database as mocks.
*
* If you are adding new functionality to the SSM where the new function needs
* to reach out to some component outside the SSM, please add it here. And when
* you do, please consider the law of demeter - if you are simply adding
* "getQueryEngine" to the SPI, you're doing it wrong, then we might as well
* have the full components as fields.
*/
interface SPI
{
void reportError( Neo4jError err );
void reportError( String message, Throwable cause );
KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode );
void bindTransactionToCurrentThread( KernelTransaction tx );
void unbindTransactionFromCurrentThread();
RecordStream run( SessionStateMachine ctx, String statement, Map<String, Object> params )
throws KernelException;
void authenticate( Map<String, Object> authToken ) throws AuthenticationException;
void udcRegisterClient( String clientName );
Statement currentStatement();
}

// Note: We shouldn't depend on GDB like this, I think. Better to define an SPI that we can shape into a spec
// for exactly the kind of underlying support the state machine needs.
public SessionStateMachine( UsageData usageData, GraphDatabaseFacade db, ThreadToStatementContextBridge txBridge,
StatementRunner engine, LogService logging, Authentication authentication )
{
this.usageData = usageData;
this.featureUsage = usageData.get(UsageDataKeys.features);
this.db = db;
this.txBridge = txBridge;
this.statementRunner = engine;
this.errorReporter = new ErrorReporter( logging, this.usageData );
this.log = logging.getInternalLog( getClass() );
this.id = UUID.randomUUID().toString();
this.authentication = authentication;
this( new StandardStateMachineSPI( usageData, db, engine, logging, authentication, txBridge ));
}

public SessionStateMachine( SPI spi )
{
this.spi = spi;
}

@Override
Expand Down Expand Up @@ -597,11 +614,10 @@ public QuerySession createSession( GraphDatabaseQueryService service, PropertyCo
InternalTransaction transaction =
service.beginTransaction( currentTransaction.transactionType(), currentTransaction.mode() );
Neo4jTransactionalContext transactionalContext =
new Neo4jTransactionalContext( service, transaction, txBridge.get(), locker );
new Neo4jTransactionalContext( service, transaction, spi.currentStatement(), locker );

return new QuerySession( transactionalContext )
{

@Override
public String toString()
{
Expand Down Expand Up @@ -635,7 +651,7 @@ private void before( Object attachment, Callback cb )

if ( hasTransaction() )
{
txBridge.bindTransactionToCurrentThread( currentTransaction );
spi.bindTransactionToCurrentThread( currentTransaction );
}
assert this.currentCallback == null;
assert this.currentAttachment == null;
Expand Down Expand Up @@ -665,7 +681,7 @@ private void after()
{
if ( hasTransaction() )
{
txBridge.unbindTransactionFromCurrentThread();
spi.unbindTransactionFromCurrentThread();
}
}
}
Expand All @@ -675,7 +691,7 @@ private void error( Neo4jError err )
{
if ( err.status().code().classification() == Status.Classification.DatabaseError )
{
log.error( "A database error occurred while servicing a user request: " + err );
spi.reportError( err );
}

if ( currentCallback != null )
Expand Down
@@ -0,0 +1,102 @@
package org.neo4j.bolt.v1.runtime.internal;

import java.util.Map;

import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.security.auth.AuthenticationException;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.concurrent.DecayingFlags;
import org.neo4j.kernel.api.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
import org.neo4j.udc.UsageData;
import org.neo4j.udc.UsageDataKeys;

class StandardStateMachineSPI implements SessionStateMachine.SPI
{
private final UsageData usageData;
private final GraphDatabaseFacade db;
private final StatementRunner statementRunner;
private final ErrorReporter errorReporter;
private final Log log;
private final Authentication authentication;
private final ThreadToStatementContextBridge txBridge;
private final DecayingFlags featureUsage;

StandardStateMachineSPI( UsageData usageData, GraphDatabaseFacade db, StatementRunner statementRunner,
LogService logging, Authentication authentication, ThreadToStatementContextBridge txBridge )
{
this.usageData = usageData;
this.db = db;
this.statementRunner = statementRunner;
this.txBridge = txBridge;
this.featureUsage = usageData.get( UsageDataKeys.features );
this.errorReporter = new ErrorReporter( logging, this.usageData );
this.log = logging.getInternalLog( SessionStateMachine.class );
this.authentication = authentication;
}

@Override
public void reportError( Neo4jError err )
{
errorReporter.report( err );
}

@Override
public void reportError( String message, Throwable cause )
{
log.error( message, cause );
}

@Override
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode )
{
db.beginTransaction( type, mode );
return txBridge.getKernelTransactionBoundToThisThread( false );
}

@Override
public void bindTransactionToCurrentThread( KernelTransaction tx )
{
txBridge.bindTransactionToCurrentThread( tx );
}

@Override
public void unbindTransactionFromCurrentThread()
{
txBridge.unbindTransactionFromCurrentThread();
}

@Override
public RecordStream run( SessionStateMachine ctx, String statement, Map<String,Object> params )
throws KernelException
{

featureUsage.flag( UsageDataKeys.Features.bolt );
return statementRunner.run( ctx, statement, params );
}

@Override
public void authenticate( Map<String,Object> authToken ) throws AuthenticationException
{
authentication.authenticate( authToken );
}

@Override
public void udcRegisterClient( String clientName )
{
usageData.get( UsageDataKeys.clientNames ).add( clientName );
}

@Override
public Statement currentStatement()
{
return txBridge.get();
}
}
Expand Up @@ -136,4 +136,37 @@ public void describeTo( Description description )
}
};
}

public static Matcher<RecordingCallback> recorded( Matcher<? super RecordingCallback.Call> ... messages )
{
return new TypeSafeMatcher<RecordingCallback>()
{
@Override
protected boolean matchesSafely( RecordingCallback recordingCallback )
{
for ( Matcher<? super RecordingCallback.Call> message : messages )
{
try
{
if(!message.matches( recordingCallback.next() ))
{
return false;
}
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
}

return true;
}

@Override
public void describeTo( Description description )
{
description.appendList( "[", "\n", "]", asList(messages) );
}
};
}
}

0 comments on commit 4385b09

Please sign in to comment.