Skip to content

Commit

Permalink
Fixes issue with failure handling and recovery in ReplicatedTransacti…
Browse files Browse the repository at this point in the history
…onSM

There was an issue with RTSM in combination with SessionTrackingState persistence,
 where a crash in between updating the session tracker and committing the tx in the
 logical log could lead to failure to actually commit the tx on recovery. This issue
 is now replicated in tests and fixed. The fix involves separating out the
 verification and update functionalities of GlobalSessionTrackerState, in order
 to invert the order of updating session state and committing in the logical log.
This commit also takes the opportunity to make sure that GlobalSessionTrackerState
 persistence is idempotent with respect to raft log entry replays.
  • Loading branch information
digitalstain committed Jan 22, 2016
1 parent c8cccc7 commit a9d245c
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 254 deletions.
Expand Up @@ -36,6 +36,9 @@ public interface GlobalSessionTrackerState<MEMBER>
/** /**
* Tracks the operation and returns true iff this operation should be allowed. * Tracks the operation and returns true iff this operation should be allowed.
*/ */
boolean validateAndTrackOperationAtLogIndex( GlobalSession<MEMBER> globalSession, LocalOperationId localOperationId, boolean validateOperation( GlobalSession<MEMBER> globalSession, LocalOperationId localOperationId );
long logIndex );
void update( GlobalSession<MEMBER> globalSession, LocalOperationId localOperationId, long logIndex );

long logIndex();
} }
Expand Up @@ -56,15 +56,44 @@ public InMemoryGlobalSessionTrackerState( InMemoryGlobalSessionTrackerState<MEMB
} }


@Override @Override
public boolean validateAndTrackOperationAtLogIndex( GlobalSession<MEMBER> globalSession, public boolean validateOperation( GlobalSession<MEMBER> globalSession,
LocalOperationId localOperationId, long logIndex ) LocalOperationId localOperationId )
{
boolean result;

LocalSessionTracker existingSessionTracker = sessionTrackers.get( globalSession.owner() );
if ( isNewSession( globalSession, existingSessionTracker ) )
{
result = isFirstOperation( localOperationId );
}
else
{
result = existingSessionTracker.isValidOperation( localOperationId );
}

return result;
}

@Override
public void update( GlobalSession<MEMBER> globalSession, LocalOperationId localOperationId, long logIndex )
{ {
this.logIndex = logIndex;
LocalSessionTracker localSessionTracker = validateGlobalSessionAndGetLocalSessionTracker( globalSession ); LocalSessionTracker localSessionTracker = validateGlobalSessionAndGetLocalSessionTracker( globalSession );
return localSessionTracker.validateAndTrackOperation( localOperationId ); localSessionTracker.validateAndTrackOperation( localOperationId );
this.logIndex = logIndex;
} }


long logIndex() private boolean isNewSession( GlobalSession<MEMBER> globalSession, LocalSessionTracker existingSessionTracker )
{
return existingSessionTracker == null || !existingSessionTracker.globalSessionId.equals( globalSession.sessionId() );
}

private boolean isFirstOperation( LocalOperationId id )
{
return id.sequenceNumber() == 0;
}

@Override
public long logIndex()
{ {
return logIndex; return logIndex;
} }
Expand All @@ -73,12 +102,8 @@ private LocalSessionTracker validateGlobalSessionAndGetLocalSessionTracker( Glob
{ {
LocalSessionTracker localSessionTracker = sessionTrackers.get( globalSession.owner() ); LocalSessionTracker localSessionTracker = sessionTrackers.get( globalSession.owner() );


if ( localSessionTracker == null ) if ( localSessionTracker == null ||
{ !localSessionTracker.globalSessionId.equals( globalSession.sessionId() ) )
localSessionTracker = new LocalSessionTracker( globalSession.sessionId() );
sessionTrackers.put( globalSession.owner(), localSessionTracker );
}
else if ( !localSessionTracker.globalSessionId.equals( globalSession.sessionId() ) )
{ {
localSessionTracker = new LocalSessionTracker( globalSession.sessionId() ); localSessionTracker = new LocalSessionTracker( globalSession.sessionId() );
sessionTrackers.put( globalSession.owner(), localSessionTracker ); sessionTrackers.put( globalSession.owner(), localSessionTracker );
Expand Down
Expand Up @@ -65,27 +65,35 @@ public OnDiskGlobalSessionTrackerState( FileSystemAbstraction fileSystemAbstract
} }


@Override @Override
public boolean validateAndTrackOperationAtLogIndex( GlobalSession<MEMBER> globalSession, LocalOperationId public boolean validateOperation( GlobalSession<MEMBER> globalSession, LocalOperationId
localOperationId, long logIndex ) localOperationId )
{
return inMemoryGlobalSessionTrackerState.validateOperation( globalSession, localOperationId );
}

@Override
public void update( GlobalSession<MEMBER> globalSession, LocalOperationId localOperationId, long logIndex )
{ {
InMemoryGlobalSessionTrackerState<MEMBER> temp = InMemoryGlobalSessionTrackerState<MEMBER> temp =
new InMemoryGlobalSessionTrackerState<>( inMemoryGlobalSessionTrackerState ); new InMemoryGlobalSessionTrackerState<>( inMemoryGlobalSessionTrackerState );


boolean stateUpdated = temp.validateAndTrackOperationAtLogIndex( globalSession, localOperationId, logIndex ); temp.update( globalSession, localOperationId, logIndex );


if ( stateUpdated ) try
{
statePersister.persistStoreData( temp );
inMemoryGlobalSessionTrackerState = temp;
}
catch ( IOException e )
{ {
try throw new RuntimeException( e );
{
statePersister.persistStoreData( temp );
inMemoryGlobalSessionTrackerState = temp;
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
} }
return stateUpdated; }

@Override
public long logIndex()
{
return inMemoryGlobalSessionTrackerState.logIndex();
} }


@Override @Override
Expand Down
Expand Up @@ -43,8 +43,8 @@


public class ReplicatedTransactionFactory public class ReplicatedTransactionFactory
{ {
public static ReplicatedTransaction createImmutableReplicatedTransaction( public static <T> ReplicatedTransaction<T> createImmutableReplicatedTransaction(
TransactionRepresentation tx, GlobalSession globalSession, LocalOperationId localOperationId ) throws IOException TransactionRepresentation tx, GlobalSession<T> globalSession, LocalOperationId localOperationId ) throws IOException
{ {
ByteBuf transactionBuffer = Unpooled.buffer(); ByteBuf transactionBuffer = Unpooled.buffer();


Expand All @@ -58,7 +58,7 @@ public static ReplicatedTransaction createImmutableReplicatedTransaction(
byte[] txBytes = Arrays.copyOf( transactionBuffer.array(), transactionBuffer.writerIndex() ); byte[] txBytes = Arrays.copyOf( transactionBuffer.array(), transactionBuffer.writerIndex() );
transactionBuffer.release(); transactionBuffer.release();


return new ReplicatedTransaction( txBytes, globalSession, localOperationId ); return new ReplicatedTransaction<>( txBytes, globalSession, localOperationId );
} }


public static TransactionRepresentation extractTransactionRepresentation( ReplicatedTransaction replicatedTransaction, byte[] extraHeader ) throws IOException public static TransactionRepresentation extractTransactionRepresentation( ReplicatedTransaction replicatedTransaction, byte[] extraHeader ) throws IOException
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.coreedge.raft.replication.Replicator; import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.replication.session.GlobalSession; import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.locks.LockTokenManager; import org.neo4j.coreedge.server.core.locks.LockTokenManager;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
Expand All @@ -39,9 +38,9 @@
import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader; import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionInvalid; import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionInvalid;


public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener public class ReplicatedTransactionStateMachine<MEMBER> implements Replicator.ReplicatedContentListener
{ {
private final GlobalSessionTrackerState sessionTracker; private final GlobalSessionTrackerState<MEMBER> sessionTracker;
private final GlobalSession myGlobalSession; private final GlobalSession myGlobalSession;
private final LockTokenManager lockTokenManager; private final LockTokenManager lockTokenManager;
private final TransactionCommitProcess commitProcess; private final TransactionCommitProcess commitProcess;
Expand All @@ -52,7 +51,7 @@ public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess
GlobalSession myGlobalSession, GlobalSession myGlobalSession,
LockTokenManager lockTokenManager, LockTokenManager lockTokenManager,
CommittingTransactions transactionFutures, CommittingTransactions transactionFutures,
GlobalSessionTrackerState globalSessionTrackerState ) GlobalSessionTrackerState<MEMBER> globalSessionTrackerState )
{ {
this.commitProcess = commitProcess; this.commitProcess = commitProcess;
this.myGlobalSession = myGlobalSession; this.myGlobalSession = myGlobalSession;
Expand All @@ -66,66 +65,95 @@ public synchronized void onReplicated( ReplicatedContent content, long logIndex
{ {
if ( content instanceof ReplicatedTransaction ) if ( content instanceof ReplicatedTransaction )
{ {
handleTransaction( (ReplicatedTransaction<CoreMember>) content, logIndex ); handleTransaction( (ReplicatedTransaction<MEMBER>) content, logIndex );
} }
} }


private void handleTransaction( ReplicatedTransaction<CoreMember> replicatedTx, long logIndex ) private void handleTransaction( ReplicatedTransaction<MEMBER> replicatedTx, long logIndex )
{ {
if ( !sessionTracker.validateAndTrackOperationAtLogIndex( replicatedTx.globalSession(), /*
replicatedTx.localOperationId(), logIndex ) || logIndex <= lastCommittedIndex ) * This check quickly verifies that the session is invalid. Since we update the session state *after* appending
* the tx to the log, we are certain here that on replay, if the session tracker says that the session is invalid,
* then the transaction either should never be committed or has already been appended in the log.
*/
if ( !operationValid( replicatedTx ) )
{ {
return; return;
} }


TransactionRepresentation tx; /*
try * At this point, we need to check if the tx exists in the log. If it does, it is ok to skip it. However, we
* may still need to persist the session state (as we may crashed in between), which happens outside this
* if check.
*/
if ( !txAlreadyCommitted( logIndex ) )
{ {
byte[] extraHeader = encodeLogIndexAsTxHeader( logIndex ); TransactionRepresentation tx;
tx = ReplicatedTransactionFactory.extractTransactionRepresentation( try
replicatedTx, extraHeader ); {
} byte[] extraHeader = encodeLogIndexAsTxHeader( logIndex );
catch ( IOException e ) tx = ReplicatedTransactionFactory.extractTransactionRepresentation(
{ replicatedTx, extraHeader );
throw new IllegalStateException( "Failed to locally commit a transaction that has already been " + }
"committed to the RAFT log. This server cannot process later transactions and needs to be " + catch ( IOException e )
"restarted once the underlying cause has been addressed.", e ); {
} throw new IllegalStateException( "Failed to locally commit a transaction that has already been " +
"committed to the RAFT log. This server cannot process later transactions and needs to be " +
"restarted once the underlying cause has been addressed.", e );
}


// A missing future means the transaction does not belong to this instance // A missing future means the transaction does not belong to this instance
Optional<CommittingTransaction> future = replicatedTx.globalSession().equals( myGlobalSession ) ? Optional<CommittingTransaction> future = replicatedTx.globalSession().equals( myGlobalSession ) ?
Optional.ofNullable( transactionFutures.retrieve( replicatedTx.localOperationId() ) ) : Optional.ofNullable( transactionFutures.retrieve( replicatedTx.localOperationId() ) ) :
Optional.<CommittingTransaction>empty(); Optional.<CommittingTransaction>empty();


int currentTokenId = lockTokenManager.currentToken().id(); int currentTokenId = lockTokenManager.currentToken().id();
int txLockSessionId = tx.getLockSessionId(); int txLockSessionId = tx.getLockSessionId();


if ( currentTokenId != txLockSessionId && txLockSessionId != Locks.Client.NO_LOCK_SESSION_ID ) if ( currentTokenId != txLockSessionId && txLockSessionId != Locks.Client.NO_LOCK_SESSION_ID )
{ {
future.ifPresent( txFuture -> txFuture.notifyCommitFailed( new TransactionFailureException( future.ifPresent( txFuture -> txFuture.notifyCommitFailed( new TransactionFailureException(
LockSessionInvalid, LockSessionInvalid,
"The lock session in the cluster has changed: " + "The lock session in the cluster has changed: " +
"[current lock session id:%d, tx lock session id:%d]", "[current lock session id:%d, tx lock session id:%d]",
currentTokenId, txLockSessionId ) ) ); currentTokenId, txLockSessionId ) ) );
return; return;
} }


try try
{ {
long txId = commitProcess.commit( new TransactionToApply( tx ), CommitEvent.NULL, long txId = commitProcess.commit( new TransactionToApply( tx ), CommitEvent.NULL,
TransactionApplicationMode.EXTERNAL ); TransactionApplicationMode.EXTERNAL );


future.ifPresent( txFuture -> txFuture.notifySuccessfullyCommitted( txId ) ); future.ifPresent( txFuture -> txFuture.notifySuccessfullyCommitted( txId ) );
}
catch ( TransactionFailureException e )
{
future.ifPresent( txFuture -> txFuture.notifyCommitFailed( e ) );
throw new IllegalStateException( "Failed to locally commit a transaction that has already been " +
"committed to the RAFT log. This server cannot process later transactions and needs to be " +
"restarted once the underlying cause has been addressed.", e );
}
} }
catch ( TransactionFailureException e ) /*
* Finally, we need to check, in an idempotent fashion, if the session state needs to be persisted.
*/
if ( sessionTracker.logIndex() < logIndex )
{ {
future.ifPresent( txFuture -> txFuture.notifyCommitFailed( e ) ); sessionTracker.update( replicatedTx.globalSession(), replicatedTx.localOperationId(), logIndex );
throw new IllegalStateException( "Failed to locally commit a transaction that has already been " +
"committed to the RAFT log. This server cannot process later transactions and needs to be " +
"restarted once the underlying cause has been addressed.", e );
} }
} }


private boolean operationValid( ReplicatedTransaction<MEMBER> replicatedTx )
{
return sessionTracker.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() );
}

private boolean txAlreadyCommitted( long logIndex )
{
return logIndex <= lastCommittedIndex;
}

public void setLastCommittedIndex( long lastCommittedIndex ) public void setLastCommittedIndex( long lastCommittedIndex )
{ {
this.lastCommittedIndex = lastCommittedIndex; this.lastCommittedIndex = lastCommittedIndex;
Expand Down
Expand Up @@ -403,15 +403,15 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator
final Dependencies dependencies, final Dependencies dependencies,
final LogService logging, final LogService logging,
Monitors monitors, Monitors monitors,
GlobalSessionTrackerState globalSessionTrackerState ) GlobalSessionTrackerState<CoreMember> globalSessionTrackerState )
{ {
return ( appender, applier, config ) -> { return ( appender, applier, config ) -> {
TransactionRepresentationCommitProcess localCommit = TransactionRepresentationCommitProcess localCommit =
new TransactionRepresentationCommitProcess( appender, applier ); new TransactionRepresentationCommitProcess( appender, applier );
dependencies.satisfyDependencies( localCommit ); dependencies.satisfyDependencies( localCommit );


CommittingTransactions committingTransactions = new CommittingTransactionsRegistry(); CommittingTransactions committingTransactions = new CommittingTransactionsRegistry();
ReplicatedTransactionStateMachine replicatedTxStateMachine = new ReplicatedTransactionStateMachine( ReplicatedTransactionStateMachine<CoreMember> replicatedTxStateMachine = new ReplicatedTransactionStateMachine<>(
localCommit, localSessionPool.getGlobalSession(), currentReplicatedLockState, localCommit, localSessionPool.getGlobalSession(), currentReplicatedLockState,
committingTransactions, globalSessionTrackerState ); committingTransactions, globalSessionTrackerState );


Expand Down

0 comments on commit a9d245c

Please sign in to comment.