Skip to content

Commit

Permalink
Cleanup session tracker naming and other small things.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Feb 29, 2016
1 parent 7ffd0f0 commit 5104dad
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
Expand Up @@ -45,12 +45,12 @@

public class ReplicatedTransactionStateMachine<MEMBER> implements StateMachine
{
private final GlobalSessionTrackerState<MEMBER> sessionTracker;
private final GlobalSessionTrackerState<MEMBER> sessionTrackerState;
private final GlobalSession myGlobalSession;
private final LockTokenManager lockTokenManager;
private final TransactionCommitProcess commitProcess;
private final CommittingTransactions transactionFutures;
private final StateStorage<GlobalSessionTrackerState<MEMBER>> storage;
private final StateStorage<GlobalSessionTrackerState<MEMBER>> sessionTrackerStorage;
private final Log log;

private long lastCommittedIndex = -1;
Expand All @@ -59,15 +59,15 @@ public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess
GlobalSession myGlobalSession,
LockTokenManager lockTokenManager,
CommittingTransactions transactionFutures,
StateStorage<GlobalSessionTrackerState<MEMBER>> storage,
StateStorage<GlobalSessionTrackerState<MEMBER>> sessionTrackerStorage,
LogProvider logProvider )
{
this.commitProcess = commitProcess;
this.myGlobalSession = myGlobalSession;
this.lockTokenManager = lockTokenManager;
this.transactionFutures = transactionFutures;
this.storage = storage;
this.sessionTracker = storage.getInitialState();
this.sessionTrackerStorage = sessionTrackerStorage;
this.sessionTrackerState = sessionTrackerStorage.getInitialState();
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -83,7 +83,7 @@ public synchronized void applyCommand( ReplicatedContent content, long logIndex
@Override
public void flush() throws IOException
{
storage.persistStoreData( sessionTracker );
sessionTrackerStorage.persistStoreData( sessionTrackerState );
}

private void handleTransaction( ReplicatedTransaction<MEMBER> replicatedTx, long logIndex )
Expand All @@ -107,11 +107,11 @@ private void handleTransaction( ReplicatedTransaction<MEMBER> replicatedTx, long

try
{
/*
* At this point, we need to check if the tx exists in the log. If it does, it is ok to skip it. However, we
* may still need to persist the session state (as we may crashed in between), which happens outside this
* if check.
*/
/*
* At this point, we need to check if the tx exists in the log. If it does, it is ok to skip it. However, we
* may still need to persist the session state (as we may crashed in between), which happens outside this
* if check.
*/
if ( logIndex <= lastCommittedIndex )
{
log.info( "Ignoring transaction at log index %d since already committed up to %d", logIndex,
Expand Down Expand Up @@ -173,18 +173,19 @@ private void handleTransaction( ReplicatedTransaction<MEMBER> replicatedTx, long
" restarted once the underlying cause has been addressed.", e );
}
}
/*
* Finally, we need to check, in an idempotent fashion, if the session state needs to be persisted.
*/
if ( sessionTracker.logIndex() < logIndex )

/*
* Finally, we need to check, in an idempotent fashion, if the session state needs to be persisted.
*/
if ( sessionTrackerState.logIndex() < logIndex )
{
sessionTracker.update( replicatedTx.globalSession(), replicatedTx.localOperationId(), logIndex );
sessionTrackerState.update( replicatedTx.globalSession(), replicatedTx.localOperationId(), logIndex );
sessionUpdated = true;
}
else
{
log.info( format( "Rejecting log index %d since the session tracker is already at log index %d",
logIndex, sessionTracker.logIndex() ) );
logIndex, sessionTrackerState.logIndex() ) );
}
}
finally
Expand All @@ -199,7 +200,7 @@ private void handleTransaction( ReplicatedTransaction<MEMBER> replicatedTx, long

private boolean operationValid( ReplicatedTransaction<MEMBER> replicatedTx )
{
return sessionTracker.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() );
return sessionTrackerState.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() );
}

public void setLastCommittedIndex( long lastCommittedIndex )
Expand Down
Expand Up @@ -76,7 +76,6 @@ public synchronized void notifyCommitted()
{
this.commitIndex = commitIndex;
executor.execute( () -> {

try
{
applyUpTo( commitIndex );
Expand Down
Expand Up @@ -278,10 +278,10 @@ fileSystem, new File( clusterStateDirectory, "lock-token-state" ), "lock-token",
new ReplicatedLockTokenStateMachine<>( lockTokenState );
stateMachines.add( replicatedLockTokenStateMachine );

StateStorage<GlobalSessionTrackerState<CoreMember>> onDiskGlobalSessionTrackerState;
StateStorage<GlobalSessionTrackerState<CoreMember>> sessionTrackerStorage;
try
{
onDiskGlobalSessionTrackerState = life.add( new DurableStateStorage<>(
sessionTrackerStorage = life.add( new DurableStateStorage<>(
fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session-tracker",
new GlobalSessionTrackerState.Marshal<>( new CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ),
Expand All @@ -295,7 +295,7 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session

commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool,
replicatedLockTokenStateMachine,
dependencies, logging, platformModule.monitors, onDiskGlobalSessionTrackerState, stateMachines );
dependencies, logging, platformModule.monitors, sessionTrackerStorage, stateMachines );

final StateStorage<IdAllocationState> idAllocationState;
try
Expand Down Expand Up @@ -452,7 +452,7 @@ public static CommitProcessFactory createCommitProcessFactory(
final Replicator replicator, final LocalSessionPool localSessionPool,
final LockTokenManager currentReplicatedLockState, final Dependencies dependencies,
final LogService logging, Monitors monitors,
StateStorage<GlobalSessionTrackerState<CoreMember>> globalSessionTrackerState,
StateStorage<GlobalSessionTrackerState<CoreMember>> sessionTrackerStorage,
StateMachines stateMachines )
{
return ( appender, applier, config ) -> {
Expand All @@ -464,7 +464,7 @@ public static CommitProcessFactory createCommitProcessFactory(
ReplicatedTransactionStateMachine<CoreMember> replicatedTxStateMachine = new
ReplicatedTransactionStateMachine<>(
localCommit, localSessionPool.getGlobalSession(), currentReplicatedLockState,
committingTransactions, globalSessionTrackerState, logging.getInternalLogProvider() );
committingTransactions, sessionTrackerStorage, logging.getInternalLogProvider() );

dependencies.satisfyDependencies( replicatedTxStateMachine );

Expand Down

0 comments on commit 5104dad

Please sign in to comment.