Skip to content

Commit

Permalink
Added new state machine states for v3 state machine
Browse files Browse the repository at this point in the history
Added decoder to read messages
Adding tests to verify v3 statemachine
  • Loading branch information
Zhen Li authored and zhenlineo committed Jul 20, 2018
1 parent b7c2916 commit 5a262c5
Show file tree
Hide file tree
Showing 57 changed files with 3,270 additions and 823 deletions.
Expand Up @@ -40,7 +40,7 @@ public interface BoltMessageLogger

void serverError( String eventName, Status status );

void logUserAgent( String userAgent );
void logInit( String userAgent );

void logRun();

Expand Down
Expand Up @@ -102,7 +102,7 @@ public void serverError( String eventName, Status status )
}

@Override
public void logUserAgent( String userAgent )
public void logInit( String userAgent )
{
clientEvent( "INIT", () -> userAgent);
}
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void serverError( String eventName, Status status )
}

@Override
public void logUserAgent( String userAgent )
public void logInit( String userAgent )
{
}

Expand Down
Expand Up @@ -75,7 +75,7 @@ else if ( protocolVersion == BoltProtocolV3.VERSION )
}
else
{
return null;
throw new IllegalArgumentException( "Failed to create a state machine for protocol version " + protocolVersion );
}
}

Expand Down
Expand Up @@ -31,9 +31,11 @@ public interface StatementProcessor

StatementMetadata run( String statement, MapValue params ) throws KernelException;

StatementMetadata run( String statement, MapValue params, Bookmark bookmark ) throws KernelException;

void streamResult( ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception;

void commitTransaction() throws KernelException;
Bookmark commitTransaction() throws KernelException;

void rollbackTransaction() throws KernelException;

Expand Down Expand Up @@ -63,14 +65,20 @@ public StatementMetadata run( String statement, MapValue params ) throws KernelE
throw new UnsupportedOperationException( "Unable to run statements" );
}

@Override
public StatementMetadata run( String statement, MapValue params, Bookmark bookmark ) throws KernelException
{
throw new UnsupportedOperationException( "Unable to run statements" );
}

@Override
public void streamResult( ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception
{
throw new UnsupportedOperationException( "Unable to stream results" );
}

@Override
public void commitTransaction() throws KernelException
public Bookmark commitTransaction() throws KernelException
{
throw new UnsupportedOperationException( "Unable to commit a transaction" );
}
Expand Down
Expand Up @@ -59,7 +59,7 @@ public RequestMessage decode( Neo4jPack.Unpacker unpacker ) throws IOException
{
String userAgent = unpacker.unpackString();
Map<String,Object> authToken = readAuthToken( unpacker );
messageLogger.logUserAgent( userAgent );
messageLogger.logInit( userAgent );
return new InitMessage( userAgent, authToken );
}

Expand Down
Expand Up @@ -37,6 +37,7 @@

public class BoltStateMachineV1SPI implements BoltStateMachineSPI
{
public static final String BOLT_SERVER_VERSION_PREFIX = "Neo4j/";
private final BoltConnectionDescriptor connectionDescriptor;
private final UsageData usageData;
private final ErrorReporter errorReporter;
Expand All @@ -54,7 +55,7 @@ public BoltStateMachineV1SPI( BoltConnectionDescriptor connectionDescriptor, Usa
this.connectionTracker = connectionTracker;
this.authentication = authentication;
this.transactionSpi = transactionStateMachineSPI;
this.version = "Neo4j/" + Version.getNeo4jVersion();
this.version = BOLT_SERVER_VERSION_PREFIX + Version.getNeo4jVersion();
}

@Override
Expand Down
Expand Up @@ -75,7 +75,7 @@ public void beginTransaction( Bookmark bookmark ) throws KernelException
{
ensureNoPendingTerminationNotice();

state = state.beginTransaction( bookmark, ctx, spi );
state = state.beginTransaction( ctx, spi, bookmark );
}
finally
{
Expand All @@ -85,13 +85,19 @@ public void beginTransaction( Bookmark bookmark ) throws KernelException

@Override
public StatementMetadata run( String statement, MapValue params ) throws KernelException
{
return run( statement, params, null );
}

@Override
public StatementMetadata run( String statement, MapValue params, Bookmark bookmark ) throws KernelException
{
before();
try
{
ensureNoPendingTerminationNotice();

state = state.run( ctx, spi, statement, params );
state = state.run( ctx, spi, statement, params, bookmark );

return ctx.currentStatementMetadata;
}
Expand All @@ -118,14 +124,15 @@ public void streamResult( ThrowingConsumer<BoltResult, Exception> resultConsumer
}

@Override
public void commitTransaction() throws KernelException
public Bookmark commitTransaction() throws KernelException
{
before();
try
{
ensureNoPendingTerminationNotice();

state = state.commitTransaction( ctx, spi );
return newestBookmark( spi );
}
catch ( TransactionFailureException ex )
{
Expand Down Expand Up @@ -239,10 +246,16 @@ enum State
AUTO_COMMIT
{
@Override
State beginTransaction( Bookmark bookmark, MutableTransactionState ctx, TransactionStateMachineSPI spi ) throws KernelException
State beginTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark ) throws KernelException
{
waitForBookmark( ctx, spi, bookmark );
ctx.currentTransaction = spi.beginTransaction( ctx.loginContext );
return EXPLICIT_TRANSACTION;
}

private void waitForBookmark( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark )
throws TransactionFailureException
{
if ( bookmark != null )
{
spi.awaitUpToDate( bookmark.txId() );
Expand All @@ -252,12 +265,19 @@ State beginTransaction( Bookmark bookmark, MutableTransactionState ctx, Transact
{
ctx.currentResult = BoltResult.EMPTY;
}

return EXPLICIT_TRANSACTION;
}

@Override
State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params ) throws KernelException
State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark )
throws KernelException
{
statement = parseStatement( ctx, statement );
waitForBookmark( ctx, spi, bookmark );
execute( ctx, spi, statement, params, spi.isPeriodicCommit( statement ) );
return AUTO_COMMIT;
}

private String parseStatement( MutableTransactionState ctx, String statement )
{
if ( statement.isEmpty() )
{
Expand All @@ -267,8 +287,7 @@ State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String s
{
ctx.lastStatement = statement;
}
execute( ctx, spi, statement, params, spi.isPeriodicCommit( statement ) );
return AUTO_COMMIT;
return statement;
}

void execute( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, boolean isPeriodicCommit )
Expand Down Expand Up @@ -336,13 +355,14 @@ State rollbackTransaction( MutableTransactionState ctx, TransactionStateMachineS
EXPLICIT_TRANSACTION
{
@Override
State beginTransaction( Bookmark bookmark, MutableTransactionState ctx, TransactionStateMachineSPI spi ) throws KernelException
State beginTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark ) throws KernelException
{
throw new QueryExecutionKernelException( new InvalidSemanticsException( "Nested transactions are not supported." ) );
}

@Override
State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params ) throws KernelException
State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark )
throws KernelException
{
if ( statement.isEmpty() )
{
Expand Down Expand Up @@ -378,8 +398,7 @@ void streamResult( MutableTransactionState ctx,
State commitTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi ) throws KernelException
{
closeTransaction( ctx, true );
long txId = spi.newestEncounteredTxId();
Bookmark bookmark = new Bookmark( txId );
Bookmark bookmark = newestBookmark( spi );
ctx.currentResult = new BookmarkResult( bookmark );
return AUTO_COMMIT;
}
Expand All @@ -393,9 +412,10 @@ State rollbackTransaction( MutableTransactionState ctx, TransactionStateMachineS
}
};

abstract State beginTransaction( Bookmark bookmark, MutableTransactionState ctx, TransactionStateMachineSPI spi ) throws KernelException;
abstract State beginTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark ) throws KernelException;

abstract State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params ) throws KernelException;
abstract State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark )
throws KernelException;

abstract void streamResult( MutableTransactionState ctx, ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception;

Expand Down Expand Up @@ -490,6 +510,12 @@ void startExecution( MutableTransactionState ctx, BoltResultHandle resultHandle

}

private static Bookmark newestBookmark( TransactionStateMachineSPI spi )
{
long txId = spi.newestEncounteredTxId();
return new Bookmark( txId );
}

static class MutableTransactionState
{
/** The current session security context to be used for starting transactions */
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.Objects;

import org.neo4j.bolt.runtime.BoltResponseHandler;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.values.AnyValue;
Expand All @@ -30,6 +31,7 @@
import org.neo4j.values.virtual.MapValue;

import static java.lang.String.format;
import static org.neo4j.values.storable.Values.stringValue;

public class Bookmark
{
Expand Down Expand Up @@ -154,6 +156,11 @@ private static long txIdFrom( AnyValue bookmark ) throws BookmarkFormatException
}
}

public void attachTo( BoltResponseHandler state )
{
state.onMetadata( BOOKMARK_KEY, stringValue( toString() ) );
}

static class BookmarkFormatException extends KernelException
{
BookmarkFormatException( String bookmarkString, NumberFormatException e )
Expand Down
Expand Up @@ -24,12 +24,13 @@
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.runtime.BoltStateMachineSPI;
import org.neo4j.bolt.v1.runtime.BoltStateMachineV1;
import org.neo4j.bolt.v1.runtime.ConnectedState;
import org.neo4j.bolt.v1.runtime.FailedState;
import org.neo4j.bolt.v1.runtime.InterruptedState;
import org.neo4j.bolt.v1.runtime.ReadyState;
import org.neo4j.bolt.v1.runtime.StreamingState;
import org.neo4j.bolt.v3.runtime.FailedState;
import org.neo4j.bolt.v3.runtime.DefunctState;
import org.neo4j.bolt.v3.runtime.ExtraMetaDataConnectedState;
import org.neo4j.bolt.v3.runtime.ReadyState;
import org.neo4j.bolt.v3.runtime.StreamingState;
import org.neo4j.bolt.v3.runtime.TransactionReadyState;

public class BoltStateMachineV3 extends BoltStateMachineV1
{
Expand All @@ -41,15 +42,19 @@ public BoltStateMachineV3( BoltStateMachineSPI boltSPI, BoltChannel boltChannel,
@Override
protected States buildStates()
{
ConnectedState connected = new ExtraMetaDataConnectedState();
ExtraMetaDataConnectedState connected = new ExtraMetaDataConnectedState();
ReadyState ready = new ReadyState();
StreamingState streaming = new StreamingState();
FailedState failed = new FailedState();
InterruptedState interrupted = new InterruptedState();
DefunctState defunct = new DefunctState();
TransactionReadyState txReady = new TransactionReadyState();
StreamingState txStreaming = new StreamingState();

connected.setReadyState( ready );
connected.setFailedState( failed );
connected.setFailedState( defunct );

ready.setTransactionReadyState( txReady );
ready.setStreamingState( streaming );
ready.setInterruptedState( interrupted );
ready.setFailedState( failed );
Expand All @@ -58,11 +63,19 @@ protected States buildStates()
streaming.setInterruptedState( interrupted );
streaming.setFailedState( failed );

failed.setReadyState( ready );
txReady.setReadyState( ready );
txReady.setTransactionStreamingState( txStreaming );
txReady.setInterruptedState( interrupted );
txReady.setFailedState( failed );

txStreaming.setReadyState( txReady );
txStreaming.setInterruptedState( interrupted );
txStreaming.setFailedState( failed );

failed.setInterruptedState( interrupted );

interrupted.setReadyState( ready );
interrupted.setFailedState( failed );
interrupted.setFailedState( defunct );

return new States( connected, failed );
}
Expand Down
Expand Up @@ -30,11 +30,14 @@
import org.neo4j.bolt.runtime.BoltResponseHandler;
import org.neo4j.bolt.v1.messaging.MessageProcessingHandler;
import org.neo4j.bolt.v1.messaging.ResultHandler;
import org.neo4j.bolt.v1.messaging.decoder.AckFailureMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.DiscardAllMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.PullAllMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.ResetMessageDecoder;
import org.neo4j.bolt.v1.messaging.decoder.RunMessageDecoder;
import org.neo4j.bolt.v3.messaging.decoder.BeginMessageDecoder;
import org.neo4j.bolt.v3.messaging.decoder.CommitMessageDecoder;
import org.neo4j.bolt.v3.messaging.decoder.HelloMessageDecoder;
import org.neo4j.bolt.v3.messaging.decoder.RollbackMessageDecoder;
import org.neo4j.bolt.v3.messaging.decoder.RunMessageDecoder;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

Expand All @@ -44,31 +47,31 @@ public BoltRequestMessageReaderV3( BoltConnection connection, BoltResponseMessag
BoltMessageLogger messageLogger, LogService logService )
{
super( connection,
newSimpleResponseHandler( connection, responseMessageWriter, logService ),
newSimpleResponseHandler( responseMessageWriter, connection, logService ),
buildDecoders( connection, responseMessageWriter, messageLogger, logService ),
messageLogger );
}

private static List<RequestMessageDecoder> buildDecoders( BoltConnection connection, BoltResponseMessageWriter responseMessageWriter,
BoltMessageLogger messageLogger, LogService logService )
{
BoltResponseHandler helloHandler = newSimpleResponseHandler( connection, responseMessageWriter, logService );
BoltResponseHandler runHandler = newSimpleResponseHandler( connection, responseMessageWriter, logService );
BoltResponseHandler resultHandler = new ResultHandler( responseMessageWriter, connection, internalLog( logService ) );
BoltResponseHandler defaultHandler = newSimpleResponseHandler( connection, responseMessageWriter, logService );
BoltResponseHandler defaultHandler = newSimpleResponseHandler( responseMessageWriter, connection, logService );

return Arrays.asList(
new HelloMessageDecoder( helloHandler, messageLogger ),
new AckFailureMessageDecoder( defaultHandler, messageLogger ),
new ResetMessageDecoder( connection, defaultHandler, messageLogger ),
new RunMessageDecoder( runHandler, messageLogger ),
new HelloMessageDecoder( defaultHandler ),
new RunMessageDecoder( defaultHandler ),
new DiscardAllMessageDecoder( resultHandler, messageLogger ),
new PullAllMessageDecoder( resultHandler, messageLogger )
new PullAllMessageDecoder( resultHandler, messageLogger ),
new BeginMessageDecoder( defaultHandler ),
new CommitMessageDecoder( resultHandler ),
new RollbackMessageDecoder( resultHandler ),
new ResetMessageDecoder( connection, defaultHandler, messageLogger )
);
}

private static BoltResponseHandler newSimpleResponseHandler( BoltConnection connection,
BoltResponseMessageWriter responseMessageWriter, LogService logService )
private static BoltResponseHandler newSimpleResponseHandler( BoltResponseMessageWriter responseMessageWriter, BoltConnection connection,
LogService logService )
{
return new MessageProcessingHandler( responseMessageWriter, connection, internalLog( logService ) );
}
Expand Down

0 comments on commit 5a262c5

Please sign in to comment.