Skip to content

Commit

Permalink
Introduces barrier tx for leader epoch demarcation
Browse files Browse the repository at this point in the history
  • Loading branch information
digitalstain authored and apcj committed Dec 8, 2015
1 parent 4ee19a5 commit 0f8e938
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.raft.replication.tx; package org.neo4j.coreedge.raft.replication.tx;


import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -39,9 +40,12 @@
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.Dependencies;


import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader;

public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener
{ {
private final GlobalSessionTracker sessionTracker; private final GlobalSessionTracker sessionTracker;
Expand All @@ -51,7 +55,7 @@ public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedC
private final Map<LocalOperationId, FutureTxId> outstanding = new ConcurrentHashMap<>(); private final Map<LocalOperationId, FutureTxId> outstanding = new ConcurrentHashMap<>();
private long lastCommittedIndex = -1; private long lastCommittedIndex = -1;
private long lastCommittedTxId; // Maintains the last committed tx id, used to set the next field private long lastCommittedTxId; // Maintains the last committed tx id, used to set the next field
private long lastTxIdForPreviousLeaderReign; // Maintains the last txid committed under the previous service assignment private long reignStartTxId; // Maintains the last txid committed under the previous service assignment


public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess, public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess,
GlobalSession myGlobalSession, Dependencies dependencies ) GlobalSession myGlobalSession, Dependencies dependencies )
Expand All @@ -78,10 +82,26 @@ public synchronized void onReplicated( ReplicatedContent content, long logIndex
} }
if ( content instanceof NewLeaderBarrier ) if ( content instanceof NewLeaderBarrier )
{ {
lastTxIdForPreviousLeaderReign = lastCommittedTxId; try
{
reignStartTxId = appendBarrierTx( logIndex );
}
catch ( TransactionFailureException e )
{
throw new RuntimeException( e );
}
} }
} }


private long appendBarrierTx( long logIndex ) throws TransactionFailureException
{
PhysicalTransactionRepresentation dummyTx = new PhysicalTransactionRepresentation( Collections.emptyList() );
// TODO we need to set the "-1"'s below to useful values
dummyTx.setHeader( encodeLogIndexAsTxHeader( logIndex ), -1, -1, -1, -1, -1, -1 );

return commitProcess.commit( new TransactionToApply( dummyTx ), CommitEvent.NULL, TransactionApplicationMode.EXTERNAL );
}

private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex ) private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex )
{ {
if ( !sessionTracker.validateAndTrackOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() ) if ( !sessionTracker.validateAndTrackOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() )
Expand All @@ -92,7 +112,7 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long logInde


try try
{ {
byte[] extraHeader = LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader( logIndex ); byte[] extraHeader = encodeLogIndexAsTxHeader( logIndex );
TransactionRepresentation tx = ReplicatedTransactionFactory.extractTransactionRepresentation( TransactionRepresentation tx = ReplicatedTransactionFactory.extractTransactionRepresentation(
replicatedTx, extraHeader ); replicatedTx, extraHeader );


Expand All @@ -101,7 +121,7 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long logInde
Optional.ofNullable( outstanding.remove( replicatedTx.localOperationId() ) ) : Optional.ofNullable( outstanding.remove( replicatedTx.localOperationId() ) ) :
Optional.<CompletableFuture<Long>>empty(); Optional.<CompletableFuture<Long>>empty();


if ( tx.getLatestCommittedTxWhenStarted() < lastTxIdForPreviousLeaderReign ) if ( tx.getLatestCommittedTxWhenStarted() < reignStartTxId )
{ {
future.ifPresent( txFuture -> txFuture.completeExceptionally( new TransientTransactionFailureException( future.ifPresent( txFuture -> txFuture.completeExceptionally( new TransientTransactionFailureException(
"Attempt to commit transaction that was started on a different leader term. " + "Attempt to commit transaction that was started on a different leader term. " +
Expand Down
Expand Up @@ -125,27 +125,29 @@ public void shouldRejectTransactionCommittedUnderOldLeader() throws Exception
when( localCommitProcess.commit( any( TransactionToApply.class ), when( localCommitProcess.commit( any( TransactionToApply.class ),
any( CommitEvent.class ), any( TransactionApplicationMode.class ) ) ) any( CommitEvent.class ), any( TransactionApplicationMode.class ) ) )
.thenAnswer( invocation -> { .thenAnswer( invocation -> {
committedTxRepresentation.set( invocation.getArgumentAt( 0, committedTxRepresentation.set(
TransactionToApply.class ).transactionRepresentation() ); invocation.getArgumentAt( 0, TransactionToApply.class ).transactionRepresentation() );
return 4L; return 4L;
} ); } )
ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine( localCommitProcess, .thenAnswer( invocation -> 5L ); // This is for the barrier tx
globalSession, null );
ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine(
localCommitProcess, globalSession, null );


// when // when
listener.onReplicated( txBefore, 0 ); // Just to get the Id listener.onReplicated( txBefore, 0 ); // Just to get the Id
listener.onReplicated( new NewLeaderBarrier(), 0 ); listener.onReplicated( new NewLeaderBarrier(), 0 );
listener.onReplicated( txAfter, 0 ); listener.onReplicated( txAfter, 0 );


// then // then
verify( localCommitProcess ).commit( any( TransactionToApply.class ), any( CommitEvent.class ), verify( localCommitProcess, times( 2 ) ).commit( any( TransactionToApply.class ), any( CommitEvent.class ),
eq( TransactionApplicationMode.EXTERNAL ) ); eq( TransactionApplicationMode.EXTERNAL ) );
assertEquals( timeBefore, committedTxRepresentation.get().getTimeStarted() ); assertEquals( timeBefore, committedTxRepresentation.get().getTimeStarted() );
verifyNoMoreInteractions( localCommitProcess ); verifyNoMoreInteractions( localCommitProcess );
} }


@Test @Test
public void shouldFailFutureForTransactionCommittedUnderOldLeader() throws Exception public void shouldFailFutureForTransactionCommittedUnderWrongLockManagerWithDifferentLastTxIdsWhenStarted() throws Exception
{ {
// given // given
LocalOperationId localOperationIdBefore = new LocalOperationId( 0, 0 ); LocalOperationId localOperationIdBefore = new LocalOperationId( 0, 0 );
Expand Down

0 comments on commit 0f8e938

Please sign in to comment.