diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/GlobalSessionTrackerState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/GlobalSessionTrackerState.java index 272a3851eeb5a..0b16360bfa3ab 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/GlobalSessionTrackerState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/GlobalSessionTrackerState.java @@ -36,6 +36,9 @@ public interface GlobalSessionTrackerState /** * Tracks the operation and returns true iff this operation should be allowed. */ - boolean validateAndTrackOperationAtLogIndex( GlobalSession globalSession, LocalOperationId localOperationId, - long logIndex ); + boolean validateOperation( GlobalSession globalSession, LocalOperationId localOperationId ); + + void update( GlobalSession globalSession, LocalOperationId localOperationId, long logIndex ); + + long logIndex(); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerState.java index 854b5731604cc..85666f7c10630 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerState.java @@ -56,15 +56,44 @@ public InMemoryGlobalSessionTrackerState( InMemoryGlobalSessionTrackerState globalSession, - LocalOperationId localOperationId, long logIndex ) + public boolean validateOperation( GlobalSession globalSession, + LocalOperationId localOperationId ) + { + boolean result; + + LocalSessionTracker existingSessionTracker = sessionTrackers.get( globalSession.owner() ); + if ( isNewSession( globalSession, existingSessionTracker ) ) + { + result = isFirstOperation( localOperationId ); + } + else + { + result = existingSessionTracker.isValidOperation( localOperationId ); + } + + return result; + } + + @Override + public void update( GlobalSession globalSession, LocalOperationId localOperationId, long logIndex ) { - this.logIndex = logIndex; LocalSessionTracker localSessionTracker = validateGlobalSessionAndGetLocalSessionTracker( globalSession ); - return localSessionTracker.validateAndTrackOperation( localOperationId ); + localSessionTracker.validateAndTrackOperation( localOperationId ); + this.logIndex = logIndex; } - long logIndex() + private boolean isNewSession( GlobalSession globalSession, LocalSessionTracker existingSessionTracker ) + { + return existingSessionTracker == null || !existingSessionTracker.globalSessionId.equals( globalSession.sessionId() ); + } + + private boolean isFirstOperation( LocalOperationId id ) + { + return id.sequenceNumber() == 0; + } + + @Override + public long logIndex() { return logIndex; } @@ -73,12 +102,8 @@ private LocalSessionTracker validateGlobalSessionAndGetLocalSessionTracker( Glob { LocalSessionTracker localSessionTracker = sessionTrackers.get( globalSession.owner() ); - if ( localSessionTracker == null ) - { - localSessionTracker = new LocalSessionTracker( globalSession.sessionId() ); - sessionTrackers.put( globalSession.owner(), localSessionTracker ); - } - else if ( !localSessionTracker.globalSessionId.equals( globalSession.sessionId() ) ) + if ( localSessionTracker == null || + !localSessionTracker.globalSessionId.equals( globalSession.sessionId() ) ) { localSessionTracker = new LocalSessionTracker( globalSession.sessionId() ); sessionTrackers.put( globalSession.owner(), localSessionTracker ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerState.java index bf94bc58dddfb..4c1f714a127eb 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerState.java @@ -65,27 +65,35 @@ public OnDiskGlobalSessionTrackerState( FileSystemAbstraction fileSystemAbstract } @Override - public boolean validateAndTrackOperationAtLogIndex( GlobalSession globalSession, LocalOperationId - localOperationId, long logIndex ) + public boolean validateOperation( GlobalSession globalSession, LocalOperationId + localOperationId ) + { + return inMemoryGlobalSessionTrackerState.validateOperation( globalSession, localOperationId ); + } + + @Override + public void update( GlobalSession globalSession, LocalOperationId localOperationId, long logIndex ) { InMemoryGlobalSessionTrackerState temp = new InMemoryGlobalSessionTrackerState<>( inMemoryGlobalSessionTrackerState ); - boolean stateUpdated = temp.validateAndTrackOperationAtLogIndex( globalSession, localOperationId, logIndex ); + temp.update( globalSession, localOperationId, logIndex ); - if ( stateUpdated ) + try + { + statePersister.persistStoreData( temp ); + inMemoryGlobalSessionTrackerState = temp; + } + catch ( IOException e ) { - try - { - statePersister.persistStoreData( temp ); - inMemoryGlobalSessionTrackerState = temp; - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } + throw new RuntimeException( e ); } - return stateUpdated; + } + + @Override + public long logIndex() + { + return inMemoryGlobalSessionTrackerState.logIndex(); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionFactory.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionFactory.java index 38ece941b4481..58e3214e40e05 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionFactory.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionFactory.java @@ -43,8 +43,8 @@ public class ReplicatedTransactionFactory { - public static ReplicatedTransaction createImmutableReplicatedTransaction( - TransactionRepresentation tx, GlobalSession globalSession, LocalOperationId localOperationId ) throws IOException + public static ReplicatedTransaction createImmutableReplicatedTransaction( + TransactionRepresentation tx, GlobalSession globalSession, LocalOperationId localOperationId ) throws IOException { ByteBuf transactionBuffer = Unpooled.buffer(); @@ -58,7 +58,7 @@ public static ReplicatedTransaction createImmutableReplicatedTransaction( byte[] txBytes = Arrays.copyOf( transactionBuffer.array(), transactionBuffer.writerIndex() ); transactionBuffer.release(); - return new ReplicatedTransaction( txBytes, globalSession, localOperationId ); + return new ReplicatedTransaction<>( txBytes, globalSession, localOperationId ); } public static TransactionRepresentation extractTransactionRepresentation( ReplicatedTransaction replicatedTransaction, byte[] extraHeader ) throws IOException diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java index 5b440390467cb..09550e94f8880 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java @@ -26,7 +26,6 @@ import org.neo4j.coreedge.raft.replication.Replicator; import org.neo4j.coreedge.raft.replication.session.GlobalSession; import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; -import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.core.locks.LockTokenManager; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.impl.api.TransactionCommitProcess; @@ -39,9 +38,9 @@ import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader; import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionInvalid; -public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener +public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener { - private final GlobalSessionTrackerState sessionTracker; + private final GlobalSessionTrackerState sessionTracker; private final GlobalSession myGlobalSession; private final LockTokenManager lockTokenManager; private final TransactionCommitProcess commitProcess; @@ -52,7 +51,7 @@ public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess GlobalSession myGlobalSession, LockTokenManager lockTokenManager, CommittingTransactions transactionFutures, - GlobalSessionTrackerState globalSessionTrackerState ) + GlobalSessionTrackerState globalSessionTrackerState ) { this.commitProcess = commitProcess; this.myGlobalSession = myGlobalSession; @@ -66,66 +65,95 @@ public synchronized void onReplicated( ReplicatedContent content, long logIndex { if ( content instanceof ReplicatedTransaction ) { - handleTransaction( (ReplicatedTransaction) content, logIndex ); + handleTransaction( (ReplicatedTransaction) content, logIndex ); } } - private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex ) + private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex ) { - if ( !sessionTracker.validateAndTrackOperationAtLogIndex( replicatedTx.globalSession(), - replicatedTx.localOperationId(), logIndex ) || logIndex <= lastCommittedIndex ) + /* + * This check quickly verifies that the session is invalid. Since we update the session state *after* appending + * the tx to the log, we are certain here that on replay, if the session tracker says that the session is invalid, + * then the transaction either should never be committed or has already been appended in the log. + */ + if ( !operationValid( replicatedTx ) ) { return; } - TransactionRepresentation tx; - try + /* + * At this point, we need to check if the tx exists in the log. If it does, it is ok to skip it. However, we + * may still need to persist the session state (as we may crashed in between), which happens outside this + * if check. + */ + if ( !txAlreadyCommitted( logIndex ) ) { - byte[] extraHeader = encodeLogIndexAsTxHeader( logIndex ); - tx = ReplicatedTransactionFactory.extractTransactionRepresentation( - replicatedTx, extraHeader ); - } - catch ( IOException e ) - { - throw new IllegalStateException( "Failed to locally commit a transaction that has already been " + - "committed to the RAFT log. This server cannot process later transactions and needs to be " + - "restarted once the underlying cause has been addressed.", e ); - } + TransactionRepresentation tx; + try + { + byte[] extraHeader = encodeLogIndexAsTxHeader( logIndex ); + tx = ReplicatedTransactionFactory.extractTransactionRepresentation( + replicatedTx, extraHeader ); + } + catch ( IOException e ) + { + throw new IllegalStateException( "Failed to locally commit a transaction that has already been " + + "committed to the RAFT log. This server cannot process later transactions and needs to be " + + "restarted once the underlying cause has been addressed.", e ); + } - // A missing future means the transaction does not belong to this instance - Optional future = replicatedTx.globalSession().equals( myGlobalSession ) ? - Optional.ofNullable( transactionFutures.retrieve( replicatedTx.localOperationId() ) ) : - Optional.empty(); + // A missing future means the transaction does not belong to this instance + Optional future = replicatedTx.globalSession().equals( myGlobalSession ) ? + Optional.ofNullable( transactionFutures.retrieve( replicatedTx.localOperationId() ) ) : + Optional.empty(); - int currentTokenId = lockTokenManager.currentToken().id(); - int txLockSessionId = tx.getLockSessionId(); + int currentTokenId = lockTokenManager.currentToken().id(); + int txLockSessionId = tx.getLockSessionId(); - if ( currentTokenId != txLockSessionId && txLockSessionId != Locks.Client.NO_LOCK_SESSION_ID ) - { - future.ifPresent( txFuture -> txFuture.notifyCommitFailed( new TransactionFailureException( - LockSessionInvalid, - "The lock session in the cluster has changed: " + - "[current lock session id:%d, tx lock session id:%d]", - currentTokenId, txLockSessionId ) ) ); - return; - } + if ( currentTokenId != txLockSessionId && txLockSessionId != Locks.Client.NO_LOCK_SESSION_ID ) + { + future.ifPresent( txFuture -> txFuture.notifyCommitFailed( new TransactionFailureException( + LockSessionInvalid, + "The lock session in the cluster has changed: " + + "[current lock session id:%d, tx lock session id:%d]", + currentTokenId, txLockSessionId ) ) ); + return; + } - try - { - long txId = commitProcess.commit( new TransactionToApply( tx ), CommitEvent.NULL, - TransactionApplicationMode.EXTERNAL ); + try + { + long txId = commitProcess.commit( new TransactionToApply( tx ), CommitEvent.NULL, + TransactionApplicationMode.EXTERNAL ); - future.ifPresent( txFuture -> txFuture.notifySuccessfullyCommitted( txId ) ); + future.ifPresent( txFuture -> txFuture.notifySuccessfullyCommitted( txId ) ); + } + catch ( TransactionFailureException e ) + { + future.ifPresent( txFuture -> txFuture.notifyCommitFailed( e ) ); + throw new IllegalStateException( "Failed to locally commit a transaction that has already been " + + "committed to the RAFT log. This server cannot process later transactions and needs to be " + + "restarted once the underlying cause has been addressed.", e ); + } } - catch ( TransactionFailureException e ) + /* + * Finally, we need to check, in an idempotent fashion, if the session state needs to be persisted. + */ + if ( sessionTracker.logIndex() < logIndex ) { - future.ifPresent( txFuture -> txFuture.notifyCommitFailed( e ) ); - throw new IllegalStateException( "Failed to locally commit a transaction that has already been " + - "committed to the RAFT log. This server cannot process later transactions and needs to be " + - "restarted once the underlying cause has been addressed.", e ); + sessionTracker.update( replicatedTx.globalSession(), replicatedTx.localOperationId(), logIndex ); } } + private boolean operationValid( ReplicatedTransaction replicatedTx ) + { + return sessionTracker.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() ); + } + + private boolean txAlreadyCommitted( long logIndex ) + { + return logIndex <= lastCommittedIndex; + } + public void setLastCommittedIndex( long lastCommittedIndex ) { this.lastCommittedIndex = lastCommittedIndex; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index d892218e223e3..57e5f553da0c6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -403,7 +403,7 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator final Dependencies dependencies, final LogService logging, Monitors monitors, - GlobalSessionTrackerState globalSessionTrackerState ) + GlobalSessionTrackerState globalSessionTrackerState ) { return ( appender, applier, config ) -> { TransactionRepresentationCommitProcess localCommit = @@ -411,7 +411,7 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator dependencies.satisfyDependencies( localCommit ); CommittingTransactions committingTransactions = new CommittingTransactionsRegistry(); - ReplicatedTransactionStateMachine replicatedTxStateMachine = new ReplicatedTransactionStateMachine( + ReplicatedTransactionStateMachine replicatedTxStateMachine = new ReplicatedTransactionStateMachine<>( localCommit, localSessionPool.getGlobalSession(), currentReplicatedLockState, committingTransactions, globalSessionTrackerState ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/BaseGlobalSessionTrackerStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/BaseGlobalSessionTrackerStateTest.java new file mode 100644 index 0000000000000..2484945cfef87 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/BaseGlobalSessionTrackerStateTest.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.replication.session; + +import java.util.UUID; + +import org.junit.Test; + +import org.neo4j.coreedge.server.RaftTestMember; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public abstract class BaseGlobalSessionTrackerStateTest +{ + RaftTestMember coreA = RaftTestMember.member( 1 ); + RaftTestMember coreB = RaftTestMember.member( 2 ); + + GlobalSession sessionA = new GlobalSession( UUID.randomUUID(), coreA ); + GlobalSession sessionA2 = new GlobalSession( UUID.randomUUID(), coreA ); + + GlobalSession sessionB = new GlobalSession( UUID.randomUUID(), coreB ); + + protected abstract GlobalSessionTrackerState instantiateSessionTracker(); + + @Test + public void firstValidSequenceNumberIsZero() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 1, -1 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 1, -1 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 2, 1 ) ) ); + } + + @Test + public void repeatedOperationsAreRejected() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + } + + @Test + public void seriesOfOperationsAreAccepted() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 1 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 1 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 2 ) ) ); + } + + @Test + public void gapsAreNotAllowed() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 1 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 1 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 3 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 3 ), 0 ); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 2 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 2 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 3 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 3 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 4 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 4 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 6 ) ) ); + } + + @Test + public void localSessionsAreIndependent() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 1 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 1 ), 0 ); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 1, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 1, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 1, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 1, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 1, 1 ) ) ); + } + + @Test + public void globalSessionsAreIndependent() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionB, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionB, new LocalOperationId( 0, 0 ), 0 ); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 1, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 1, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionB, new LocalOperationId( 1, 0 ) ) ); + sessionTracker.update( sessionB, new LocalOperationId( 1, 0 ), 0 ); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 2, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 2, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 2, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 2, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionB, new LocalOperationId( 2, 0 ) ) ); + sessionTracker.update( sessionB, new LocalOperationId( 2, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionA, new LocalOperationId( 2, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 2, 0 ), 0 ); + assertFalse( sessionTracker.validateOperation( sessionB, new LocalOperationId( 2, 0 ) ) ); + } + + @Test + public void newGlobalSessionUnderSameOwnerResetsCorrespondingLocalSessionTracker() + { + GlobalSessionTrackerState sessionTracker = instantiateSessionTracker(); + + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA, new LocalOperationId( 0, 1 ) ) ); + sessionTracker.update( sessionA, new LocalOperationId( 0, 1 ), 0 ); + + assertFalse( sessionTracker.validateOperation( sessionA2, new LocalOperationId( 0, 2 ) ) ); + sessionTracker.update( sessionA2, new LocalOperationId( 0, 2 ), 0 ); + + assertTrue( sessionTracker.validateOperation( sessionA2, new LocalOperationId( 0, 0 ) ) ); + sessionTracker.update( sessionA2, new LocalOperationId( 0, 0 ), 0 ); + assertTrue( sessionTracker.validateOperation( sessionA2, new LocalOperationId( 0, 1 ) ) ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerStateTest.java index 2017190c16a9b..fe8987b0a8356 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/InMemoryGlobalSessionTrackerStateTest.java @@ -19,117 +19,13 @@ */ package org.neo4j.coreedge.raft.replication.session; -import java.util.UUID; +import org.neo4j.coreedge.server.RaftTestMember; -import org.junit.Test; - -import org.neo4j.coreedge.server.AdvertisedSocketAddress; -import org.neo4j.coreedge.server.CoreMember; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class InMemoryGlobalSessionTrackerStateTest +public class InMemoryGlobalSessionTrackerStateTest extends BaseGlobalSessionTrackerStateTest { - CoreMember coreA = new CoreMember( new AdvertisedSocketAddress( "core:1" ), - new AdvertisedSocketAddress( "raft:1" ) ); - CoreMember coreB = new CoreMember( new AdvertisedSocketAddress( "core:2" ), - new AdvertisedSocketAddress( "raft:2" ) ); - - GlobalSession sessionA = new GlobalSession( UUID.randomUUID(), coreA ); - GlobalSession sessionA2 = new GlobalSession( UUID.randomUUID(), coreA ); - - GlobalSession sessionB = new GlobalSession( UUID.randomUUID(), coreB ); - - @Test - public void firstValidSequenceNumberIsZero() - { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 1, -1 ), - 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 2, 1 ), 0 ) ); - } - - @Test - public void repeatedOperationsAreRejected() + @Override + protected GlobalSessionTrackerState instantiateSessionTracker() { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - } - - @Test - public void seriesOfOperationsAreAccepted() - { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 1 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 2 ), 0 ) ); - } - - @Test - public void gapsAreNotAllowed() - { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 1 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 3 ), 0 ) ); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 2 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 3 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 4 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 6 ), 0 ) ); - } - - @Test - public void localSessionsAreIndependent() - { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 1 ), 0 ) ); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 1, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 1, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 1, 1 ), 0 ) ); - } - - @Test - public void globalSessionsAreIndependent() - { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionB, new LocalOperationId( 0, 0 ), 0 ) ); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 1, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionB, new LocalOperationId( 1, 0 ), 0 ) ); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 2, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 2, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionB, new LocalOperationId( 2, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 2, 0 ), 0 ) ); - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionB, new LocalOperationId( 2, 0 ), 0 ) ); - } - - @Test - public void newGlobalSessionUnderSameOwnerResetsCorrespondingLocalSessionTracker() - { - InMemoryGlobalSessionTrackerState sessionTracker = new InMemoryGlobalSessionTrackerState(); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA, new LocalOperationId( 0, 1 ), 0 ) ); - - assertFalse( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA2, new LocalOperationId( 0, 2 ), 0 ) ); - - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA2, new LocalOperationId( 0, 0 ), 0 ) ); - assertTrue( sessionTracker.validateAndTrackOperationAtLogIndex( sessionA2, new LocalOperationId( 0, 1 ), 0 ) ); + return new InMemoryGlobalSessionTrackerState<>(); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerStateTest.java index 1284da7c7c44e..00e4cbc2c39fe 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/session/OnDiskGlobalSessionTrackerStateTest.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.raft.replication.session; import java.io.File; +import java.io.IOException; import java.util.function.Supplier; import org.junit.Rule; @@ -28,63 +29,55 @@ import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.coreedge.server.RaftTestMember.RaftTestMemberMarshal; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; -import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.test.TargetDirectory; import static java.util.UUID.randomUUID; import static junit.framework.Assert.assertFalse; import static junit.framework.TestCase.assertTrue; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.server.RaftTestMember.member; -public class OnDiskGlobalSessionTrackerStateTest +public class OnDiskGlobalSessionTrackerStateTest extends BaseGlobalSessionTrackerStateTest { @Rule public final TargetDirectory.TestDirectory testDir = TargetDirectory.testDirForTest( getClass() ); + private final EphemeralFileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); + @Test public void shouldRoundtripGlobalSessionTrackerState() throws Exception { // given - EphemeralFileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); - fsa.mkdir( testDir.directory() ); - - OnDiskGlobalSessionTrackerState oldState = new OnDiskGlobalSessionTrackerState<>( fsa, - testDir.directory(), new RaftTestMemberMarshal(), 100, mock( Supplier.class ) ); + GlobalSessionTrackerState oldState = instantiateSessionTracker(); final GlobalSession globalSession = new GlobalSession<>( randomUUID(), member( 1 ) ); - oldState.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 0 ), 0 ); + oldState.update( globalSession, new LocalOperationId( 1, 0 ), 0 ); // when OnDiskGlobalSessionTrackerState newState = new OnDiskGlobalSessionTrackerState<>( fsa, testDir.directory(), new RaftTestMemberMarshal(), 100, mock( Supplier.class ) ); // then - assertTrue( oldState.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 1 ), 99 ) ); - assertTrue( newState.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 1 ), 99 ) ); + assertTrue( oldState.validateOperation( globalSession, new LocalOperationId( 1, 1 ) ) ); + assertTrue( newState.validateOperation( globalSession, new LocalOperationId( 1, 1 ) ) ); - assertFalse( oldState.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 3 ), 99 ) ); - assertFalse( newState.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 3 ), 99 ) ); + assertFalse( oldState.validateOperation( globalSession, new LocalOperationId( 1, 3 ) ) ); + assertFalse( newState.validateOperation( globalSession, new LocalOperationId( 1, 3 ) ) ); } @Test public void shouldPersistOnSessionCreation() throws Exception { // given - FileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); - - OnDiskGlobalSessionTrackerState state = new OnDiskGlobalSessionTrackerState<>( fsa, - testDir.directory(), new RaftTestMemberMarshal(), 10, mock( Supplier.class ) ); + GlobalSessionTrackerState state = instantiateSessionTracker(); // when - state.validateAndTrackOperationAtLogIndex( new GlobalSession<>( randomUUID(), member( 1 ) ), - new LocalOperationId( 1, 0 ), 0 ); + state.update( new GlobalSession<>( randomUUID(), member( 1 ) ), new LocalOperationId( 1, 0 ), 0 ); // then assertThat( fsa.getFileSize( new File( testDir.directory(), OnDiskGlobalSessionTrackerState.FILENAME + "a" ) @@ -95,69 +88,34 @@ public void shouldPersistOnSessionCreation() throws Exception public void shouldPersistOnSessionUpdate() throws Exception { // given - FileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); - - OnDiskGlobalSessionTrackerState state = new OnDiskGlobalSessionTrackerState<>( fsa, - testDir.directory(), new RaftTestMemberMarshal(), 10, mock( Supplier.class ) ); + GlobalSessionTrackerState state = instantiateSessionTracker(); File fileName = new File( testDir.directory(), OnDiskGlobalSessionTrackerState.FILENAME + "a" ); GlobalSession globalSession = new GlobalSession<>( randomUUID(), member( 1 ) ); - state.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 0 ), 0 ); + state.update( globalSession, new LocalOperationId( 1, 0 ), 0 ); long initialFileSize = fsa.getFileSize( fileName ); // when // the global session exists and this local operation id is a valid next value - state.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 1 ), 1 ); + state.update( globalSession, new LocalOperationId( 1, 1 ), 1 ); // then assertThat( fsa.getFileSize( fileName ), greaterThan( initialFileSize ) ); } - @Test - public void shouldNotPersistOnNegativeSessionCheckForExistingGlobalSession() throws Exception - { - // given - FileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); - - OnDiskGlobalSessionTrackerState state = new OnDiskGlobalSessionTrackerState<>( fsa, - testDir.directory(), new RaftTestMemberMarshal(), 10, mock( Supplier.class ) ); - File fileName = new File( testDir.directory(), OnDiskGlobalSessionTrackerState.FILENAME + "a" ); - - GlobalSession globalSession = new GlobalSession<>( randomUUID(), member( - 1 ) ); - state.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 0 ), 0 ); - - long initialFileSize = fsa.getFileSize( fileName ); - assertThat( initialFileSize, greaterThan( 0L ) ); - - // when - // The global session exists but this local operation id is not a valid next value - state.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 2, 4 ), 1 ); - - // then - assertThat( fsa.getFileSize( fileName ), equalTo( initialFileSize ) ); - } - - - @Test - public void shouldNotPersistOnNegativeSessionCheckForNewGlobalSession() throws Exception + @Override + protected GlobalSessionTrackerState instantiateSessionTracker() { - // given - FileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); - - OnDiskGlobalSessionTrackerState state = new OnDiskGlobalSessionTrackerState<>( fsa, - testDir.directory(), new RaftTestMemberMarshal(), 10, mock( Supplier.class ) ); - File fileName = new File( testDir.directory(), OnDiskGlobalSessionTrackerState.FILENAME + "a" ); - - GlobalSession globalSession = new GlobalSession<>( randomUUID(), member( - 1 ) ); - - // when - // this is the first time we see globalSession but the local operation id is invalid - state.validateAndTrackOperationAtLogIndex( globalSession, new LocalOperationId( 1, 1 ), 0 ); + fsa.mkdir( testDir.directory() ); - // then - assertThat( fsa.getFileSize( fileName ), equalTo( 0L ) ); + try + { + return new OnDiskGlobalSessionTrackerState<>( fsa, testDir.directory(), new RaftTestMemberMarshal(), 100, mock( Supplier.class ) ); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } } } \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java new file mode 100644 index 0000000000000..00b000cfaec0c --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.replication.tx; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_MOCKS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.UUID; + +import org.junit.Test; +import org.mockito.stubbing.Stubber; +import org.neo4j.coreedge.raft.replication.session.GlobalSession; +import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; +import org.neo4j.coreedge.raft.replication.session.InMemoryGlobalSessionTrackerState; +import org.neo4j.coreedge.raft.replication.session.LocalOperationId; +import org.neo4j.coreedge.server.RaftTestMember; +import org.neo4j.coreedge.server.core.locks.LockTokenManager; +import org.neo4j.graphdb.TransactionFailureException; +import org.neo4j.kernel.impl.api.TransactionCommitProcess; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; + +public class ReplicatedTransactionStateMachinePersistenceTest +{ + @Test + public void shouldNotRejectUncommittedTransactionsAfterCrashEvenIfSessionTrackerSaysSo() throws Exception + { + // given + TransactionCommitProcess commitProcess = mock( TransactionCommitProcess.class ); + when( commitProcess.commit( any(), any(), any() ) ).thenThrow( new TransactionFailureException( "testing" ) ) + .thenReturn( 123L ); + + ReplicatedTransactionStateMachine rtsm = new ReplicatedTransactionStateMachine<>( + commitProcess, + new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), + mock( LockTokenManager.class, RETURNS_MOCKS ), + new CommittingTransactionsRegistry(), + new InMemoryGlobalSessionTrackerState<>() ); + + TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); + ReplicatedTransaction rtx = + ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, new GlobalSession<>( UUID + .randomUUID(), RaftTestMember.member( 2 ) ), new LocalOperationId( 1, 0 ) ); + rtsm.setLastCommittedIndex( 99 ); + + // when + try + { + rtsm.onReplicated( rtx, 100 ); + fail( "test design throws exception here" ); + } + catch ( TransactionFailureException thrownByTestDesign ) + { + // expected + } + reset( commitProcess ); // ignore all previous interactions, we care what happens from now on + rtsm.setLastCommittedIndex( 99 ); + rtsm.onReplicated( rtx, 100 ); + + // then + verify( commitProcess, times( 1 ) ).commit( any(), any(), any() ); + } + + @Test + public void shouldUpdateSessionStateOnRecoveryEvenIfTxCommittedOnFirstTry() throws Exception + { + // given + TransactionCommitProcess commitProcess = mock( TransactionCommitProcess.class ); + + GlobalSessionTrackerState sessionTracker = mock( GlobalSessionTrackerState.class ); + when( sessionTracker.validateOperation( any(), any() ) ).thenReturn( true ); + + Stubber stubber = doThrow( new RuntimeException() ); + stubber.when( sessionTracker ).update( any(), any(), anyLong() ); + stubber.doNothing().when( sessionTracker ).update( any(), any(), anyLong() ); + + ReplicatedTransactionStateMachine rtsm = new ReplicatedTransactionStateMachine<>( + commitProcess, + new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), + mock( LockTokenManager.class, RETURNS_MOCKS ), + new CommittingTransactionsRegistry(), + sessionTracker ); + + TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); + ReplicatedTransaction rtx = + ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, + new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 2 ) ), + new LocalOperationId( 1, 0 ) ); + + // when + // we try to commit but fail on session update, and then try to do recovery + try + { + // transaction gets committed at log index 99. It will reach the tx log but not the session state + rtsm.onReplicated( rtx, 99 ); + fail( "test setup should have resulted in an exception by now" ); + } + catch ( RuntimeException totallyExpectedByTestSetup ) + { + // dully ignored + } + // reset state so we can do proper validation below + reset( commitProcess ); + + // now let's do recovery. The log contains the last tx, so the last committed log index is the previous: 99 + rtsm.setLastCommittedIndex( 99 ); + + // however, the raft log will give us the same tx, as we did not return successfully from the last + // onReplicated() + rtsm.onReplicated( rtx, 99 ); + + // then + // there should be no commit of tx, but an update on the session state + verifyZeroInteractions( commitProcess ); + verify( sessionTracker, times( 2 ) ).update( any(), any(), eq( 99L ) ); + } + + @Test + public void shouldSkipUpdatingSessionStateForSameIndexAfterSuccessfulUpdate() throws Exception + { + // given + TransactionCommitProcess commitProcess = mock( TransactionCommitProcess.class ); + + InMemoryGlobalSessionTrackerState sessionTrackerState = spy( new + InMemoryGlobalSessionTrackerState<>() ); + ReplicatedTransactionStateMachine rtsm = new ReplicatedTransactionStateMachine<>( + commitProcess, + new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), + mock( LockTokenManager.class, RETURNS_MOCKS ), + new CommittingTransactionsRegistry(), + sessionTrackerState ); + + TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); + ReplicatedTransaction rtx = + ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, + new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 2 ) ), + new LocalOperationId( 1, 0 ) ); + + // when + // we commit a tx normally + final int commitAtRaftLogIndex = 99; + rtsm.onReplicated( rtx, commitAtRaftLogIndex ); + + // simply verify that things were properly updated + assertEquals( commitAtRaftLogIndex, sessionTrackerState.logIndex() ); + + // when the same replicated content is passed in again + rtsm.onReplicated( rtx, commitAtRaftLogIndex ); + + // then + verify( commitProcess, times( 1 ) ).commit( any(), any(), any() ); + verify( sessionTrackerState, times( 1 ) ).update( any(), any(), eq( 99L ) ); + } +} \ No newline at end of file