Skip to content

Commit

Permalink
Fix after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhen Li authored and zhenlineo committed Jul 20, 2018
1 parent 3e5308b commit 90a1685
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 28 deletions.
Expand Up @@ -248,27 +248,18 @@ enum State
@Override @Override
State beginTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark ) throws KernelException State beginTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark ) throws KernelException
{ {
waitForBookmark( ctx, spi, bookmark ); waitForBookmark( spi, bookmark );
ctx.currentResult = BoltResult.EMPTY; ctx.currentResult = BoltResult.EMPTY;
ctx.currentTransaction = spi.beginTransaction( ctx.loginContext ); ctx.currentTransaction = spi.beginTransaction( ctx.loginContext );
return EXPLICIT_TRANSACTION; return EXPLICIT_TRANSACTION;
} }


private void waitForBookmark( MutableTransactionState ctx, TransactionStateMachineSPI spi, Bookmark bookmark )
throws TransactionFailureException
{
if ( bookmark != null )
{
spi.awaitUpToDate( bookmark.txId() );
}
}

@Override @Override
State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark ) State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark )
throws KernelException throws KernelException
{ {
statement = parseStatement( ctx, statement ); statement = parseStatement( ctx, statement );
waitForBookmark( ctx, spi, bookmark ); waitForBookmark( spi, bookmark );
execute( ctx, spi, statement, params, spi.isPeriodicCommit( statement ) ); execute( ctx, spi, statement, params, spi.isPeriodicCommit( statement ) );
return AUTO_COMMIT; return AUTO_COMMIT;
} }
Expand Down Expand Up @@ -506,6 +497,15 @@ void startExecution( MutableTransactionState ctx, BoltResultHandle resultHandle


} }


private static void waitForBookmark( TransactionStateMachineSPI spi, Bookmark bookmark )
throws TransactionFailureException
{
if ( bookmark != null )
{
spi.awaitUpToDate( bookmark.txId() );
}
}

private static Bookmark newestBookmark( TransactionStateMachineSPI spi ) private static Bookmark newestBookmark( TransactionStateMachineSPI spi )
{ {
long txId = spi.newestEncounteredTxId(); long txId = spi.newestEncounteredTxId();
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.neo4j.bolt.messaging.BoltIOException; import org.neo4j.bolt.messaging.BoltIOException;
import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.Status;


public enum StatementMode //TODO is this already somewhere? public enum StatementMode
{ {
READ( "R" ), READ( "R" ),
WRITE( "W" ); WRITE( "W" );
Expand All @@ -41,11 +41,11 @@ public String signature()


public static StatementMode parseMode( String str ) throws BoltIOException public static StatementMode parseMode( String str ) throws BoltIOException
{ {
if ( str.equalsIgnoreCase( READ.signature() ) ) if ( READ.signature().equalsIgnoreCase( str ) )
{ {
return READ; return READ;
} }
else if ( str.equalsIgnoreCase( WRITE.signature() ) ) else if ( WRITE.signature().equalsIgnoreCase( str ) )
{ {
return WRITE; return WRITE;
} }
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.values.AnyValue; import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.LongValue; import org.neo4j.values.storable.LongValue;
import org.neo4j.values.storable.TextValue; import org.neo4j.values.storable.TextValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue; import org.neo4j.values.virtual.MapValue;
import org.neo4j.values.virtual.VirtualValues; import org.neo4j.values.virtual.VirtualValues;


Expand Down Expand Up @@ -72,24 +73,38 @@ static Bookmark parseBookmark( MapValue meta ) throws BoltIOException
} }
} }


static Duration parseTransactionTimeout( MapValue meta ) static Duration parseTransactionTimeout( MapValue meta ) throws BoltIOException
{ {
AnyValue anyValue = meta.get( TX_TIMEOUT_KEY ); AnyValue anyValue = meta.get( TX_TIMEOUT_KEY );
if ( anyValue instanceof LongValue ) if ( anyValue == Values.NO_VALUE )
{
return null;
}
else if ( anyValue instanceof LongValue )
{ {
return Duration.ofMillis( ((LongValue) anyValue).longValue() ); return Duration.ofMillis( ((LongValue) anyValue).longValue() );
} }
return null; else
{
throw new BoltIOException( Status.Request.InvalidFormat, "Expecting transaction timeout value to be a Long value, but got: " + anyValue );
}
} }


static StatementMode parseStatementMode( MapValue meta ) throws BoltIOException static StatementMode parseStatementMode( MapValue meta ) throws BoltIOException
{ {
AnyValue anyValue = meta.get( MODE_KEY ); AnyValue anyValue = meta.get( MODE_KEY );
if ( anyValue instanceof TextValue ) if ( anyValue == Values.NO_VALUE )
{
return null;
}
else if ( anyValue instanceof TextValue )
{ {
return StatementMode.parseMode( ((TextValue) anyValue).stringValue() ); return StatementMode.parseMode( ((TextValue) anyValue).stringValue() );
} }
return null; else
{
throw new BoltIOException( Status.Request.InvalidFormat, "Expecting transaction statement mode value to be a String value, but got: " + anyValue );
}
} }


public Bookmark bookmark() public Bookmark bookmark()
Expand Down
Expand Up @@ -40,7 +40,7 @@


public class TransactionReadyState extends FailSafeBoltStateMachineState public class TransactionReadyState extends FailSafeBoltStateMachineState
{ {
private BoltStateMachineState streaming; private BoltStateMachineState streamingState;
private BoltStateMachineState interruptedState; private BoltStateMachineState interruptedState;
private BoltStateMachineState readyState; private BoltStateMachineState readyState;


Expand Down Expand Up @@ -75,7 +75,7 @@ public String name()


public void setTransactionStreamingState( BoltStateMachineState streamingState ) public void setTransactionStreamingState( BoltStateMachineState streamingState )
{ {
this.streaming = streamingState; this.streamingState = streamingState;
} }


public void setInterruptedState( BoltStateMachineState interruptedState ) public void setInterruptedState( BoltStateMachineState interruptedState )
Expand All @@ -97,7 +97,7 @@ private BoltStateMachineState processRunMessage( RunMessage message, StateMachin


context.connectionState().onMetadata( FIELDS_KEY, stringArray( statementMetadata.fieldNames() ) ); context.connectionState().onMetadata( FIELDS_KEY, stringArray( statementMetadata.fieldNames() ) );
context.connectionState().onMetadata( FIRST_RECORD_AVAILABLE_KEY, Values.longValue( end - start ) ); context.connectionState().onMetadata( FIRST_RECORD_AVAILABLE_KEY, Values.longValue( end - start ) );
return streaming; return streamingState;
} }


private BoltStateMachineState processCommitMessage( StateMachineContext context ) throws Exception private BoltStateMachineState processCommitMessage( StateMachineContext context ) throws Exception
Expand All @@ -117,7 +117,7 @@ private BoltStateMachineState processRollbackMessage( StateMachineContext contex


private void assertInitialized() private void assertInitialized()
{ {
checkState( streaming != null, "Streaming state not set" ); checkState( streamingState != null, "Streaming state not set" );
checkState( interruptedState != null, "Interrupted state not set" ); checkState( interruptedState != null, "Interrupted state not set" );
checkState( readyState != null, "Ready state not set" ); checkState( readyState != null, "Ready state not set" );
checkState( failedState != null, "Failed state not set" ); checkState( failedState != null, "Failed state not set" );
Expand Down
Expand Up @@ -58,7 +58,7 @@ class BoltStateMachineFactoryImplTest


@ParameterizedTest( name = "V{0}" ) @ParameterizedTest( name = "V{0}" )
@ValueSource( longs = {BoltProtocolV1.VERSION, BoltProtocolV2.VERSION} ) @ValueSource( longs = {BoltProtocolV1.VERSION, BoltProtocolV2.VERSION} )
void shouldCreateBoltStateMachines( long protocolVersion ) throws Throwable void shouldCreateBoltStateMachinesV1( long protocolVersion ) throws Throwable
{ {
BoltStateMachineFactoryImpl factory = newBoltFactory(); BoltStateMachineFactoryImpl factory = newBoltFactory();


Expand Down
Expand Up @@ -38,8 +38,11 @@
import org.neo4j.bolt.v1.messaging.request.ResetMessage; import org.neo4j.bolt.v1.messaging.request.ResetMessage;
import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket; import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket;
import org.neo4j.bolt.v1.transport.integration.TransportTestUtil; import org.neo4j.bolt.v1.transport.integration.TransportTestUtil;
import org.neo4j.bolt.v1.transport.socket.client.SecureSocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.SecureWebSocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.SocketConnection; import org.neo4j.bolt.v1.transport.socket.client.SocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection; import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.bolt.v1.transport.socket.client.WebSocketConnection;
import org.neo4j.bolt.v3.messaging.request.BeginMessage; import org.neo4j.bolt.v3.messaging.request.BeginMessage;
import org.neo4j.bolt.v3.messaging.request.HelloMessage; import org.neo4j.bolt.v3.messaging.request.HelloMessage;
import org.neo4j.bolt.v3.messaging.request.RunMessage; import org.neo4j.bolt.v3.messaging.request.RunMessage;
Expand Down Expand Up @@ -90,8 +93,7 @@ public class BoltV3TransportIT
@Parameters( name = "{0}" ) @Parameters( name = "{0}" )
public static List<Class<? extends TransportConnection>> transports() public static List<Class<? extends TransportConnection>> transports()
{ {
return asList( SocketConnection.class ); return asList( SocketConnection.class, WebSocketConnection.class, SecureSocketConnection.class, SecureWebSocketConnection.class );
//, WebSocketConnection.class, SecureSocketConnection.class, SecureWebSocketConnection.class );
} }


@Before @Before
Expand Down Expand Up @@ -123,7 +125,7 @@ public void shouldNegotiateProtocolV3() throws Exception
} }


@Test @Test
public void shouldNegotiateProtocolV2WhenClientSupportsBothV1V2AndV3() throws Exception public void shouldNegotiateProtocolV3WhenClientSupportsBothV1V2AndV3() throws Exception
{ {
connection.connect( address ) connection.connect( address )
.send( util.acceptedVersions( 3, 2, 1, 0 ) ) .send( util.acceptedVersions( 3, 2, 1, 0 ) )
Expand Down
Expand Up @@ -54,7 +54,7 @@
class ConnectedStateIT extends BoltStateMachineStateTestBase class ConnectedStateIT extends BoltStateMachineStateTestBase
{ {
@Test @Test
void shouldExecuteStatement() throws Throwable void shouldHandleHelloMessage() throws Throwable
{ {
// Given // Given
BoltStateMachineV3 machine = newStateMachine(); BoltStateMachineV3 machine = newStateMachine();
Expand Down

0 comments on commit 90a1685

Please sign in to comment.