Skip to content

Commit

Permalink
Implemented a few readability tweaks on the Transaction ID Tracker im…
Browse files Browse the repository at this point in the history
…plementation.
  • Loading branch information
jimwebber authored and davidegrohmann committed Jul 14, 2016
1 parent 0782bff commit e2fbbde
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 54 deletions.
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.bolt.v1.runtime.StatementMetadata;
import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.logging.Log;

/** Bridges the gap between incoming deserialized messages, the user environment and back. */
Expand Down Expand Up @@ -60,8 +59,8 @@ public TransportBridge( Log log, Session session, MessageHandler<IOException> ou
public void handleInitMessage( String clientName, Map<String,Object> authToken ) throws RuntimeException
{
// TODO: make the client transmit the version for now it is hardcoded to -1 to ensure current behaviour
long baseDBVersion = -1;
session.init( clientName, authToken, baseDBVersion, null, initCallback );
long currentHighestTransactionId = -1;
session.init( clientName, authToken, currentHighestTransactionId, null, initCallback );
}

@Override
Expand Down
Expand Up @@ -83,11 +83,11 @@ public String connectionDescriptor()
}

@Override
public <A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion,
public <A> void init( String clientName, Map<String,Object> authToken, long currentHighestTransactionId,
A attachment, Callback<Boolean,A> callback )
{
monitor.messageReceived();
delegate.init( clientName, authToken, baseDBVersion, attachment, withMonitor( callback ) );
delegate.init( clientName, authToken, currentHighestTransactionId, attachment, withMonitor( callback ) );
}

@Override
Expand Down
Expand Up @@ -131,7 +131,7 @@ public static <V, A> Callback<V,A> noop()
/**
* Initialize the session.
*/
<A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion, A attachment, Callback<Boolean,A> callback );
<A> void init( String clientName, Map<String,Object> authToken, long currentHighestTransactionId, A attachment, Callback<Boolean,A> callback );

/**
* Run a statement, yielding a result stream which can be retrieved through pulling it in a subsequent call.
Expand Down Expand Up @@ -172,7 +172,7 @@ public static <V, A> Callback<V,A> noop()
* Reset the session to an IDLE state. This clears any outstanding failure condition, disposes
* of any outstanding result records and rolls back the current transaction (if any).
*
* This differs from {@link #reset(Object, Callback)} in that it is more "radical" - it does not
* This differs from {@link #ackFailure(Object, Callback)} in that it is more "radical" - it does not
* matter what the state of the session is, as long as it is open, reset will move it back to IDLE.
*
* This is designed to cater to two use cases:
Expand Down
Expand Up @@ -27,13 +27,13 @@
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.bolt.HaltableUserSession;

public class ErrorReportingSession extends HaltableUserSession.Adapter implements Session
class ErrorReportingSession extends HaltableUserSession.Adapter implements Session
{
private final String connectionDescriptor;
private final Neo4jError error;
private final String id;

public ErrorReportingSession( String connectionDescriptor, Neo4jError error )
ErrorReportingSession( String connectionDescriptor, Neo4jError error )
{
this.error = error;
this.id = UUID.randomUUID().toString();
Expand Down Expand Up @@ -62,7 +62,7 @@ private <V, A> void reportError( A attachment, Callback<V,A> callback )
}

@Override
public <A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion,
public <A> void init( String clientName, Map<String,Object> authToken, long currentHighestTransactionId,
A attachment, Callback<Boolean,A> callback )
{
reportError( attachment, callback );
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.security.AuthSubject;
import org.neo4j.kernel.api.security.AuthToken;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
Expand All @@ -56,13 +55,14 @@
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;
import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;
import static org.neo4j.kernel.api.exceptions.Status.Security.CredentialsExpired;
import static org.neo4j.kernel.api.security.AuthToken.PRINCIPAL;

/**
* State-machine based implementation of {@link Session}. With this approach,
* the discrete states a session can be in are explicit. Each state describes which actions from the context
* interface are legal given that particular state, and how those actions behave given the current state.
*/
public class SessionStateMachine implements Session, SessionState
class SessionStateMachine implements Session, SessionState
{
/**
* The session state machine, this is the heart of how a session operates. This enumerates the various discrete
Expand All @@ -76,17 +76,18 @@ enum State
UNINITIALIZED
{
@Override
public State init( SessionStateMachine ctx, String clientName, Map<String,Object> authToken, long baseDBVersion )
public State init( SessionStateMachine ctx, String clientName, Map<String,Object> authToken,
long currentHighestTransactionId )
{
try
{
AuthenticationResult authResult = ctx.spi.authenticate( authToken );
ctx.versionTracking = ctx.spi.versionTracking( baseDBVersion );
ctx.transactionIdTracker = ctx.spi.versionTracking( currentHighestTransactionId );
ctx.authSubject = authResult.getAuthSubject();
ctx.credentialsExpired = authResult.credentialsExpired();
ctx.result( authResult.credentialsExpired() );
ctx.spi.udcRegisterClient( clientName );
ctx.setQuerySourceFromClientNameAndPrincipal( clientName, authToken.get( AuthToken.PRINCIPAL ) );
ctx.setQuerySourceFromClientNameAndPrincipal( clientName, authToken.get( PRINCIPAL ) );
ctx.spi.sessionActivated( ctx );
return IDLE;
}
Expand Down Expand Up @@ -168,7 +169,7 @@ private State doBeginTransaction( SessionStateMachine ctx, KernelTransaction.Typ
// way, we need a different way to kill statements running in implicit
// transactions, because we do that by calling #terminate() on this tx.
ctx.currentTransaction =
ctx.spi.beginTransaction( type, ctx.authSubject, ctx.versionTracking );
ctx.spi.beginTransaction( type, ctx.authSubject, ctx.transactionIdTracker );
return IN_TRANSACTION;
}
catch ( TransactionFailureException e )
Expand Down Expand Up @@ -454,7 +455,8 @@ protected State onNoImplementation( SessionStateMachine ctx, String command )

// Operations that a session can perform. Individual states override these if they want to support them.

public State init( SessionStateMachine ctx, String clientName, Map<String,Object> authToken, long baseDBVersion )
public State init( SessionStateMachine ctx, String clientName, Map<String,Object> authToken,
long currentHighestTransactionId )
{
return onNoImplementation( ctx, "initializing the session" );
}
Expand Down Expand Up @@ -689,7 +691,7 @@ public String[] fieldNames()
/** These are the "external" actions the state machine can take */
private final SPI spi;

private SPI.VersionTracking versionTracking;
private SPI.TransactionIdTracker transactionIdTracker;

/**
* This SPI encapsulates the "external" actions the state machine can take.
Expand All @@ -710,7 +712,8 @@ interface SPI
String connectionDescriptor();
void reportError( Neo4jError err );
void reportError( String message, Throwable cause );
KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, VersionTracking versionTracking )
KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode,
TransactionIdTracker transactionIdTracker )
throws TransactionFailureException;
void bindTransactionToCurrentThread( KernelTransaction tx );
void unbindTransactionFromCurrentThread();
Expand All @@ -721,17 +724,17 @@ RecordStream run( SessionStateMachine ctx, String statement, Map<String, Object>
Statement currentStatement();
void sessionActivated( Session session );
void sessionHalted( Session session );
VersionTracking versionTracking( long startingVersion );
TransactionIdTracker versionTracking( long startingVersion );

interface VersionTracking
interface TransactionIdTracker
{
void assertUpToDate() throws TransactionFailureException;

void updateVersion( long version );
}
}

public SessionStateMachine( String connectionDescriptor, UsageData usageData, GraphDatabaseAPI db,
SessionStateMachine( String connectionDescriptor, UsageData usageData, GraphDatabaseAPI db,
ThreadToStatementContextBridge txBridge, StatementRunner engine, LogService logging,

Authentication authentication, Supplier<TransactionIdStore> transactionIdStore, SessionTracker sessionTracker )
Expand All @@ -740,7 +743,7 @@ public SessionStateMachine( String connectionDescriptor, UsageData usageData, Gr
txBridge, transactionIdStore, sessionTracker ) );
}

public SessionStateMachine( SPI spi )
SessionStateMachine( SPI spi )
{
this.spi = spi;
this.authSubject = AuthSubject.ANONYMOUS;
Expand Down Expand Up @@ -769,13 +772,13 @@ private void setQuerySourceFromClientNameAndPrincipal( String clientName, Object
}

@Override
public <A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion,
public <A> void init( String clientName, Map<String,Object> authToken, long currentHighestTransactionId,
A attachment, Callback<Boolean,A> callback )
{
before( attachment, callback );
try
{
state = state.init( this, clientName, authToken, baseDBVersion );
state = state.init( this, clientName, authToken, currentHighestTransactionId );
}
finally { after(); }
}
Expand Down Expand Up @@ -1061,7 +1064,7 @@ private class BoltQuerySession extends QuerySession
{
private final String querySource;

public BoltQuerySession( Neo4jTransactionalContext transactionalContext, String querySource )
BoltQuerySession( Neo4jTransactionalContext transactionalContext, String querySource )
{
super( transactionalContext );
this.querySource = querySource;
Expand Down
Expand Up @@ -95,13 +95,13 @@ public void reportError( String message, Throwable cause )
}

@Override
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, VersionTracking versionTracking )
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, TransactionIdTracker transactionIdTracker )
throws TransactionFailureException
{
versionTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();
db.beginTransaction( type, mode );
KernelTransaction kernelTransaction = txBridge.getKernelTransactionBoundToThisThread( false );
kernelTransaction.registerCloseListener( versionTracking::updateVersion );
kernelTransaction.registerCloseListener( transactionIdTracker::updateVersion );
return kernelTransaction;
}

Expand Down Expand Up @@ -155,7 +155,7 @@ public void sessionHalted( Session session )
sessionTracker.sessionHalted( session );
}

public VersionTracking versionTracking( long startingVersion )
public TransactionIdTracker versionTracking( long startingVersion )
{
return new TransactionIdTracking( transactionIdStore, startingVersion, 30, TimeUnit.SECONDS );
}
Expand Down
Expand Up @@ -28,18 +28,19 @@

import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

class TransactionIdTracking implements SessionStateMachine.SPI.VersionTracking
class TransactionIdTracking implements SessionStateMachine.SPI.TransactionIdTracker
{
private final Supplier<TransactionIdStore> transactionIdStore;
private final int timeout;
private final TimeUnit timeoutUnit;

private long txId;

TransactionIdTracking( Supplier<TransactionIdStore> transactionIdStore, long version, int timeout, TimeUnit unit )
TransactionIdTracking( Supplier<TransactionIdStore> transactionIdStore, long transactionId, int timeout,
TimeUnit unit )
{
this.transactionIdStore = transactionIdStore;
this.txId = version;
this.txId = transactionId;
this.timeout = timeout;
this.timeoutUnit = unit;
}
Expand Down
Expand Up @@ -31,13 +31,13 @@
/**
* A session implementation that delegates work to a worker thread.
*/
public class SessionWorkerFacade implements Session
class SessionWorkerFacade implements Session
{
private final String key;
private final String connectionDescriptor;
private final SessionWorker worker;

public SessionWorkerFacade( String key, String connectionDescriptor, SessionWorker worker )
SessionWorkerFacade( String key, String connectionDescriptor, SessionWorker worker )
{
this.key = key;
this.connectionDescriptor = connectionDescriptor;
Expand All @@ -57,10 +57,10 @@ public String connectionDescriptor()
}

@Override
public <A> void init( final String clientName, Map<String,Object> authToken, long baseDBVersion, A attachment,
Callback<Boolean,A> callback )
public <A> void init( final String clientName, Map<String,Object> authToken, long currentHighestTransactionId, A attachment,
Callback<Boolean,A> callback )
{
queue( session -> session.init( clientName, authToken, baseDBVersion, attachment, callback ) );
queue( session -> session.init( clientName, authToken, currentHighestTransactionId, attachment, callback ) );
}

@Override
Expand Down
Expand Up @@ -19,13 +19,13 @@
*/
package org.neo4j.bolt.v1.runtime;

import org.junit.Test;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;

import org.junit.Test;

import org.neo4j.bolt.v1.runtime.MonitoredSessions.MonitoredSession;
import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
Expand All @@ -39,6 +39,7 @@
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import static org.neo4j.bolt.v1.runtime.Session.Callback.noOp;

public class MonitoredSessionsTest
Expand Down Expand Up @@ -103,9 +104,9 @@ public void shouldNotWrapWithMonitoredSessionIfNobodyIsListening() throws Throwa

private static class CountingSessionMonitor implements MonitoredSessions.SessionMonitor
{
public long messagesRecieved = 0;
public long queueTime = 0;
public long processingTime = 0;
long messagesRecieved = 0;
long queueTime = 0;
long processingTime = 0;

@Override
public void messageReceived()
Expand All @@ -128,7 +129,7 @@ public void processingDone( long processingTime )

private static class ControlledCompletionSession extends HaltableUserSession.Adapter implements Session
{
public Callback callback;
Callback callback;

@Override
public String key()
Expand All @@ -142,7 +143,7 @@ public String connectionDescriptor()
}

@Override
public <A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion,
public <A> void init( String clientName, Map<String,Object> authToken, long currentHighestTransactionId,
A attachment, Callback<Boolean,A> callback )
{
this.callback = callback;
Expand Down Expand Up @@ -223,7 +224,7 @@ public Instant instant()
return Instant.ofEpochMilli( millis );
}

public void forward( long delta )
void forward( long delta )
{
this.millis += delta;
}
Expand Down
Expand Up @@ -177,7 +177,7 @@ public void reportError( String message, Throwable cause )
}

@Override
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, VersionTracking versionTracking )
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, TransactionIdTracker transactionIdTracker )
{
liveTransactions.incrementAndGet();
return new CloseTrackingKernelTransaction();
Expand Down Expand Up @@ -232,9 +232,9 @@ public void sessionHalted( Session session )
}

@Override
public VersionTracking versionTracking( long startingVersion )
public TransactionIdTracker versionTracking( long startingVersion )
{
return new VersionTracking()
return new TransactionIdTracker()
{
@Override
public void assertUpToDate() throws TransactionFailureException
Expand Down
Expand Up @@ -93,7 +93,7 @@ public void shouldStopRunningTxOnHalt() throws Throwable
// Then
assertThat( machine.state(), CoreMatchers.equalTo( SessionStateMachine.State.STOPPED ) );
verify( spi ).beginTransaction( any( KernelTransaction.Type.class ), any( AccessMode.class ),
any( SessionStateMachine.SPI.VersionTracking.class ) );
any( SessionStateMachine.SPI.TransactionIdTracker.class ) );
verify( ktx ).close();
}

Expand Down
Expand Up @@ -97,7 +97,7 @@ public void testSyntaxError() throws Throwable
private SessionStateMachine newIdleMachine()
{
SessionStateMachine machine = new SessionStateMachine( "<idle>", new UsageData( scheduler ), db, txBridge,
runner, NullLogService.getInstance(), Authentication.NONE, () -> transactionIdStore, sessionTracker );
runner, NullLogService.getInstance(), Authentication.NONE, () -> transactionIdStore, sessionTracker );
machine.init( "FunClient", map(), -1, null, Session.Callback.noOp() );
return machine;
}
Expand Down

0 comments on commit e2fbbde

Please sign in to comment.