From 0f8e938dc20207647dd0fbfe1c3cf675aa663b9a Mon Sep 17 00:00:00 2001 From: Chris Gioran Date: Tue, 8 Dec 2015 13:52:50 +0200 Subject: [PATCH] Introduces barrier tx for leader epoch demarcation --- .../tx/ReplicatedTransactionStateMachine.java | 28 ++++++++++++++++--- ...ReplicatedTransactionStateMachineTest.java | 16 ++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java index f038350e5f89c..2c71c83b3060b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.raft.replication.tx; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -39,9 +40,12 @@ import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.util.Dependencies; +import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader; + public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener { private final GlobalSessionTracker sessionTracker; @@ -51,7 +55,7 @@ public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedC private final Map outstanding = new ConcurrentHashMap<>(); private long lastCommittedIndex = -1; private long lastCommittedTxId; // Maintains the last committed tx id, used to set the next field - private long lastTxIdForPreviousLeaderReign; // Maintains the last txid committed under the previous service assignment + private long reignStartTxId; // Maintains the last txid committed under the previous service assignment public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess, GlobalSession myGlobalSession, Dependencies dependencies ) @@ -78,10 +82,26 @@ public synchronized void onReplicated( ReplicatedContent content, long logIndex } if ( content instanceof NewLeaderBarrier ) { - lastTxIdForPreviousLeaderReign = lastCommittedTxId; + try + { + reignStartTxId = appendBarrierTx( logIndex ); + } + catch ( TransactionFailureException e ) + { + throw new RuntimeException( e ); + } } } + private long appendBarrierTx( long logIndex ) throws TransactionFailureException + { + PhysicalTransactionRepresentation dummyTx = new PhysicalTransactionRepresentation( Collections.emptyList() ); + // TODO we need to set the "-1"'s below to useful values + dummyTx.setHeader( encodeLogIndexAsTxHeader( logIndex ), -1, -1, -1, -1, -1, -1 ); + + return commitProcess.commit( new TransactionToApply( dummyTx ), CommitEvent.NULL, TransactionApplicationMode.EXTERNAL ); + } + private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex ) { if ( !sessionTracker.validateAndTrackOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() ) @@ -92,7 +112,7 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long logInde try { - byte[] extraHeader = LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader( logIndex ); + byte[] extraHeader = encodeLogIndexAsTxHeader( logIndex ); TransactionRepresentation tx = ReplicatedTransactionFactory.extractTransactionRepresentation( replicatedTx, extraHeader ); @@ -101,7 +121,7 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long logInde Optional.ofNullable( outstanding.remove( replicatedTx.localOperationId() ) ) : Optional.>empty(); - if ( tx.getLatestCommittedTxWhenStarted() < lastTxIdForPreviousLeaderReign ) + if ( tx.getLatestCommittedTxWhenStarted() < reignStartTxId ) { future.ifPresent( txFuture -> txFuture.completeExceptionally( new TransientTransactionFailureException( "Attempt to commit transaction that was started on a different leader term. " + diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java index d06b85c1f87c5..c48de01d8a2af 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java @@ -125,12 +125,14 @@ public void shouldRejectTransactionCommittedUnderOldLeader() throws Exception when( localCommitProcess.commit( any( TransactionToApply.class ), any( CommitEvent.class ), any( TransactionApplicationMode.class ) ) ) .thenAnswer( invocation -> { - committedTxRepresentation.set( invocation.getArgumentAt( 0, - TransactionToApply.class ).transactionRepresentation() ); + committedTxRepresentation.set( + invocation.getArgumentAt( 0, TransactionToApply.class ).transactionRepresentation() ); return 4L; - } ); - ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine( localCommitProcess, - globalSession, null ); + } ) + .thenAnswer( invocation -> 5L ); // This is for the barrier tx + + ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine( + localCommitProcess, globalSession, null ); // when listener.onReplicated( txBefore, 0 ); // Just to get the Id @@ -138,14 +140,14 @@ public void shouldRejectTransactionCommittedUnderOldLeader() throws Exception listener.onReplicated( txAfter, 0 ); // then - verify( localCommitProcess ).commit( any( TransactionToApply.class ), any( CommitEvent.class ), + verify( localCommitProcess, times( 2 ) ).commit( any( TransactionToApply.class ), any( CommitEvent.class ), eq( TransactionApplicationMode.EXTERNAL ) ); assertEquals( timeBefore, committedTxRepresentation.get().getTimeStarted() ); verifyNoMoreInteractions( localCommitProcess ); } @Test - public void shouldFailFutureForTransactionCommittedUnderOldLeader() throws Exception + public void shouldFailFutureForTransactionCommittedUnderWrongLockManagerWithDifferentLastTxIdsWhenStarted() throws Exception { // given LocalOperationId localOperationIdBefore = new LocalOperationId( 0, 0 );