diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java index 47fca8d46d67..4797524822bc 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java @@ -44,11 +44,11 @@ public interface RaftLog extends ReadableRaftLog interface Listener { - void onAppended( ReplicatedContent content, long index ); + void onAppended( ReplicatedContent content, long logIndex ); - void onCommitted( ReplicatedContent content, long index ); + void onCommitted( ReplicatedContent content, long logIndex ); - void onTruncated( long fromIndex ); + void onTruncated( long fromLogIndex ); } /** diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java index 04c676d10b55..8dd26f3e73e4 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java @@ -79,38 +79,54 @@ public RaftMembershipManager( Replicator replicator, RaftGroup.Builder m } @Override - public void onAppended( ReplicatedContent content, long index ) + public void onAppended( ReplicatedContent content, long logIndex ) { - if ( content instanceof RaftGroup && index > raftMembershipState.logIndex() ) + if ( content instanceof RaftGroup) { - assert uncommittedMemberChanges >= 0; + if ( logIndex > raftMembershipState.logIndex() ) + { + assert uncommittedMemberChanges >= 0; - uncommittedMemberChanges++; + uncommittedMemberChanges++; - RaftGroup raftGroup = (RaftGroup) content; - raftMembershipState.setVotingMembers( raftGroup.getMembers() ); + RaftGroup raftGroup = (RaftGroup) content; + raftMembershipState.setVotingMembers( raftGroup.getMembers() ); + } + else + { + log.info( "Ignoring content at index %d, since already appended up to %d", + logIndex, raftMembershipState.logIndex() ); + } } } @Override - public void onCommitted( ReplicatedContent content, long index ) + public void onCommitted( ReplicatedContent content, long logIndex ) { - if ( content instanceof RaftGroup && index > raftMembershipState.logIndex() ) + if ( content instanceof RaftGroup ) { - assert uncommittedMemberChanges > 0; + if ( logIndex > raftMembershipState.logIndex() ) + { + assert uncommittedMemberChanges > 0; - uncommittedMemberChanges--; + uncommittedMemberChanges--; - if ( uncommittedMemberChanges == 0 ) + if ( uncommittedMemberChanges == 0 ) + { + membershipStateMachine.onRaftGroupCommitted(); + } + raftMembershipState.logIndex( logIndex ); + } + else { - membershipStateMachine.onRaftGroupCommitted(); + log.info( "Ignoring content at index %d, since already committed up to %d", + logIndex, raftMembershipState.logIndex() ); } - raftMembershipState.logIndex( index ); } } @Override - public void onTruncated( long fromIndex ) + public void onTruncated( long fromLogIndex ) { try { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java index 588459f40638..a8fa10105250 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/RaftReplicator.java @@ -77,21 +77,21 @@ public void unsubscribe( ReplicatedContentListener listener ) } @Override - public void onAppended( ReplicatedContent content, long index ) + public void onAppended( ReplicatedContent content, long logIndex ) { } @Override - public void onCommitted( ReplicatedContent content, long index ) + public void onCommitted( ReplicatedContent content, long logIndex ) { for ( ReplicatedContentListener listener : listeners ) { - listener.onReplicated( content, index ); + listener.onReplicated( content, logIndex ); } } @Override - public void onTruncated( long fromIndex ) + public void onTruncated( long fromLogIndex ) { } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java index bddc15c77a38..282370bebede 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java @@ -24,6 +24,8 @@ import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.impl.store.id.IdRange; import org.neo4j.kernel.impl.store.id.IdType; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; import static org.neo4j.collection.primitive.PrimitiveLongCollections.EMPTY_LONG_ARRAY; @@ -46,11 +48,14 @@ public class ReplicatedIdAllocationStateMachine implements Replicator.Replicated { private final CoreMember me; private final IdAllocationState idAllocationState; + private final Log log; - public ReplicatedIdAllocationStateMachine( CoreMember me, IdAllocationState idAllocationState ) + public ReplicatedIdAllocationStateMachine( CoreMember me, IdAllocationState idAllocationState, + LogProvider logProvider ) { this.me = me; this.idAllocationState = idAllocationState; + this.log = logProvider.getLog( getClass() ); } public synchronized long getFirstNotAllocated( IdType idType ) @@ -87,27 +92,35 @@ private void updateFirstNotAllocated( IdType idType, long idRangeEnd ) @Override public synchronized void onReplicated( ReplicatedContent content, long logIndex ) { - if ( content instanceof ReplicatedIdAllocationRequest && logIndex > idAllocationState.logIndex() ) + if ( content instanceof ReplicatedIdAllocationRequest ) { - ReplicatedIdAllocationRequest request = (ReplicatedIdAllocationRequest) content; + if ( logIndex > idAllocationState.logIndex() ) + { + ReplicatedIdAllocationRequest request = (ReplicatedIdAllocationRequest) content; - IdType idType = request.idType(); + IdType idType = request.idType(); - if ( request.idRangeStart() == idAllocationState.firstUnallocated( idType ) ) - { - if ( request.owner().equals( me ) ) + if ( request.idRangeStart() == idAllocationState.firstUnallocated( idType ) ) { - idAllocationState.lastIdRangeStart( idType, request.idRangeStart() ); - idAllocationState.lastIdRangeLength( idType, request.idRangeLength() ); - } - updateFirstNotAllocated( idType, request.idRangeStart() + request.idRangeLength() ); + if ( request.owner().equals( me ) ) + { + idAllocationState.lastIdRangeStart( idType, request.idRangeStart() ); + idAllocationState.lastIdRangeLength( idType, request.idRangeLength() ); + } + updateFirstNotAllocated( idType, request.idRangeStart() + request.idRangeLength() ); + } + /* + * We update regardless of whether this content was meant for us or not. Even if it isn't content we + * care about, any content before it has already been applied so it is safe to ignore. + */ + idAllocationState.logIndex( logIndex ); + } + else + { + log.info( "Ignoring content at index %d, since already applied up to %d", + logIndex, idAllocationState.logIndex() ); } - /* - * We update regardless of whether this content was meant for us or not. Even if it isn't content we - * care about, any content before it has already been applied so it is safe to ignore. - */ - idAllocationState.logIndex( logIndex ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedLabelTokenHolder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedLabelTokenHolder.java index 303b7c276968..5ded5ac15915 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedLabelTokenHolder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedLabelTokenHolder.java @@ -30,14 +30,16 @@ import org.neo4j.kernel.impl.transaction.state.Loaders; import org.neo4j.kernel.impl.transaction.state.RecordAccess; import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.Token; public class ReplicatedLabelTokenHolder extends ReplicatedTokenHolder implements LabelTokenHolder { - public ReplicatedLabelTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies, - long timeoutMillis) + public ReplicatedLabelTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, + Dependencies dependencies, long timeoutMillis, LogProvider logProvider ) { - super( replicator, idGeneratorFactory, IdType.LABEL_TOKEN, dependencies, new Token.Factory(), TokenType.LABEL, timeoutMillis ); + super( replicator, idGeneratorFactory, IdType.LABEL_TOKEN, + dependencies, new Token.Factory(), TokenType.LABEL, timeoutMillis, logProvider ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedPropertyKeyTokenHolder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedPropertyKeyTokenHolder.java index e8158b98dcbe..35ab297842a6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedPropertyKeyTokenHolder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedPropertyKeyTokenHolder.java @@ -30,13 +30,16 @@ import org.neo4j.kernel.impl.transaction.state.Loaders; import org.neo4j.kernel.impl.transaction.state.RecordAccess; import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.Token; public class ReplicatedPropertyKeyTokenHolder extends ReplicatedTokenHolder implements PropertyKeyTokenHolder { - public ReplicatedPropertyKeyTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies, long timeoutMillis ) + public ReplicatedPropertyKeyTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, + Dependencies dependencies, long timeoutMillis, LogProvider logProvider ) { - super( replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, dependencies, new Token.Factory(), TokenType.PROPERTY, timeoutMillis ); + super( replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, + dependencies, new Token.Factory(), TokenType.PROPERTY, timeoutMillis, logProvider ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedRelationshipTypeTokenHolder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedRelationshipTypeTokenHolder.java index b02662721447..2d898dd113ce 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedRelationshipTypeTokenHolder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedRelationshipTypeTokenHolder.java @@ -31,12 +31,15 @@ import org.neo4j.kernel.impl.transaction.state.Loaders; import org.neo4j.kernel.impl.transaction.state.RecordAccess; import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.logging.LogProvider; public class ReplicatedRelationshipTypeTokenHolder extends ReplicatedTokenHolder implements RelationshipTypeTokenHolder { - public ReplicatedRelationshipTypeTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies, long timeoutMillis ) + public ReplicatedRelationshipTypeTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, + Dependencies dependencies, long timeoutMillis, LogProvider logProvider ) { - super( replicator, idGeneratorFactory, IdType.RELATIONSHIP_TYPE_TOKEN, dependencies, new RelationshipTypeToken.Factory(), TokenType.RELATIONSHIP, timeoutMillis ); + super( replicator, idGeneratorFactory, IdType.RELATIONSHIP_TYPE_TOKEN, dependencies, + new RelationshipTypeToken.Factory(), TokenType.RELATIONSHIP, timeoutMillis, logProvider ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolder.java index 93c314383331..5ef12f9a42ff 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolder.java @@ -52,6 +52,8 @@ import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.collection.NoSuchEntryException; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.Token; @@ -75,13 +77,14 @@ public abstract class ReplicatedTokenHolder tokenFactory, TokenType type, - long timeoutMillis) + Dependencies dependencies, TokenFactory tokenFactory, TokenType type, + long timeoutMillis, LogProvider logProvider ) { this.replicator = replicator; this.idGeneratorFactory = idGeneratorFactory; @@ -91,6 +94,7 @@ public ReplicatedTokenHolder( Replicator replicator, IdGeneratorFactory idGenera this.type = type; this.timeoutMillis = timeoutMillis; this.tokenCache = new InMemoryTokenCache<>( this.getClass() ); + this.log = logProvider.getLog( getClass() ); } @Override @@ -209,32 +213,36 @@ public Iterable getAllTokens() @Override public void onReplicated( ReplicatedContent content, long logIndex ) { - if ( logIndex <= lastCommittedIndex ) - { - return; - } if ( content instanceof ReplicatedTokenRequest && ((ReplicatedTokenRequest) content).type().equals( type ) ) { - ReplicatedTokenRequest tokenRequest = (ReplicatedTokenRequest) content; + if ( logIndex > lastCommittedIndex ) + { + ReplicatedTokenRequest tokenRequest = (ReplicatedTokenRequest) content; - Integer tokenId = tokenCache.getId( tokenRequest.tokenName() ); + Integer tokenId = tokenCache.getId( tokenRequest.tokenName() ); - if ( tokenId == null ) - { - try + if ( tokenId == null ) { - Collection commands = ReplicatedTokenRequestSerializer.extractCommands( tokenRequest.commandBytes() ); - tokenId = applyToStore( commands, logIndex ); - } - catch ( NoSuchEntryException e ) - { - throw new IllegalStateException( "Commands did not contain token command" ); + try + { + Collection commands = ReplicatedTokenRequestSerializer.extractCommands( tokenRequest.commandBytes() ); + tokenId = applyToStore( commands, logIndex ); + } + catch ( NoSuchEntryException e ) + { + throw new IllegalStateException( "Commands did not contain token command" ); + } + + tokenCache.put( tokenFactory.newToken( tokenRequest.tokenName(), tokenId ) ); } - tokenCache.put( tokenFactory.newToken( tokenRequest.tokenName(), tokenId ) ); + tokenFutures.complete( tokenRequest.tokenName(), tokenId ); + } + else + { + log.info( "Ignoring content at index %d, since already applied up to %d", + logIndex, lastCommittedIndex ); } - - tokenFutures.complete( tokenRequest.tokenName(), tokenId ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java index 39f7a2e586f6..b384864db486 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java @@ -97,7 +97,11 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long * may still need to persist the session state (as we may crashed in between), which happens outside this * if check. */ - if ( !txAlreadyCommitted( logIndex ) ) + if ( logIndex <= lastCommittedIndex ) + { + log.info( "Ignoring transaction at log index %d since already committed up to %d", logIndex, lastCommittedIndex ); + } + else { TransactionRepresentation tx; try @@ -167,11 +171,6 @@ private boolean operationValid( ReplicatedTransaction replicatedTx ) return sessionTracker.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() ); } - private boolean txAlreadyCommitted( long logIndex ) - { - return logIndex <= lastCommittedIndex; - } - public void setLastCommittedIndex( long lastCommittedIndex ) { this.lastCommittedIndex = lastCommittedIndex; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index d9e02415efb4..277bc18ef4fa 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -293,8 +293,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, throw new RuntimeException( e ); } - ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( myself, - idAllocationState ); + ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( + myself, idAllocationState, logProvider ); replicator.subscribe( idAllocationStateMachine ); @@ -316,11 +316,11 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, Long tokenCreationTimeout = config.get( CoreEdgeClusterSettings.token_creation_timeout ); ReplicatedRelationshipTypeTokenHolder relationshipTypeTokenHolder = new ReplicatedRelationshipTypeTokenHolder( - replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout ); + replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout, logProvider ); ReplicatedPropertyKeyTokenHolder propertyKeyTokenHolder = new ReplicatedPropertyKeyTokenHolder( - replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout ); + replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout, logProvider ); ReplicatedLabelTokenHolder labelTokenHolder = new ReplicatedLabelTokenHolder( - replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout ); + replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout, logProvider ); LifeSupport tokenLife = new LifeSupport(); this.relationshipTypeTokenHolder = tokenLife.add( relationshipTypeTokenHolder ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java index 0360a47e4cd3..73360d3bfb23 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java @@ -54,18 +54,18 @@ public void shouldUpdateCommitIndexEvenIfConsumerFails() throws Exception log.registerListener( new RaftLog.Listener() { @Override - public void onAppended( ReplicatedContent content, long index ) + public void onAppended( ReplicatedContent content, long logIndex ) { } @Override - public void onCommitted( ReplicatedContent raftableContent, long index ) + public void onCommitted( ReplicatedContent raftableContent, long logIndex ) { throw new RuntimeException( "Fail to accept the content" ); } @Override - public void onTruncated( long fromIndex ) + public void onTruncated( long fromLogIndex ) { } } ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java index e07d38d29025..525cb420b5c5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java @@ -26,6 +26,7 @@ import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.store.id.IdRange; +import org.neo4j.logging.NullLogProvider; import static junit.framework.TestCase.assertEquals; import static org.mockito.Mockito.mock; @@ -46,7 +47,7 @@ public void shouldNotHaveAnyIdsInitially() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); // when IdRange myHighestIdRange = idAllocationStateMachine.getHighestIdRange( me, someType ); @@ -62,7 +63,7 @@ public void shouldUpdateStateOnlyForTypeRequested() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); ReplicatedIdAllocationRequest idAllocationRequest = new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ); // when @@ -78,7 +79,7 @@ public void shouldUpdateHighestIdRangeForSelf() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); ReplicatedIdAllocationRequest idAllocationRequest = new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ); // when @@ -95,13 +96,13 @@ public void severalDistinctRequestsShouldIncrementallyUpdate() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); long index = 0; // when idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), index++ ); idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 1024, 1024 ), index++ ); - idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 2048, 1024 ), index++ ); + idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 2048, 1024 ), index ); // then assertEquals( 3072, idAllocationStateMachine.getFirstNotAllocated( someType ) ); @@ -112,7 +113,7 @@ public void severalEqualRequestsShouldOnlyUpdateOnce() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); // when idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), 0 ); @@ -128,7 +129,7 @@ public void outOfOrderRequestShouldBeIgnored() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); // when idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), 0 ); @@ -144,7 +145,7 @@ public void requestLosingRaceShouldBeIgnored() { // given ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); // when idAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( someoneElse, someType, 0, 1024 ), 0 ); @@ -164,14 +165,14 @@ public void shouldCorrectlyRestartWithPreviousState() throws Exception IdAllocationState idAllocationState = new InMemoryIdAllocationState(); ReplicatedIdAllocationStateMachine firstIdAllocationStateMachine = - new ReplicatedIdAllocationStateMachine( me, idAllocationState ); + new ReplicatedIdAllocationStateMachine( me, idAllocationState, NullLogProvider.getInstance() ); firstIdAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), 0 ); firstIdAllocationStateMachine.onReplicated( new ReplicatedIdAllocationRequest( me, someType, 1024, 1024 ), 1 ); // when ReplicatedIdAllocationStateMachine secondIdAllocationStateMachine = - new ReplicatedIdAllocationStateMachine( me, idAllocationState ); + new ReplicatedIdAllocationStateMachine( me, idAllocationState, NullLogProvider.getInstance() ); // then assertEquals( firstIdAllocationStateMachine.getHighestIdRange( me, someType ), @@ -201,7 +202,7 @@ public void shouldIgnoreAlreadySeenIndex() throws Exception final long AN_INDEX = 24; IdAllocationState idAllocationState = new InMemoryIdAllocationState(); ReplicatedIdAllocationStateMachine stateMachine = - new ReplicatedIdAllocationStateMachine( me, idAllocationState ); + new ReplicatedIdAllocationStateMachine( me, idAllocationState, NullLogProvider.getInstance() ); // which has seen a replicated content at a specific index ReplicatedIdAllocationRequest mockRequest = mock( ReplicatedIdAllocationRequest.class ); @@ -237,7 +238,7 @@ public void shouldContinueAcceptingRequestsAfterIgnoreAlreadySeenIndex() throws final long AN_INDEX = 24; IdAllocationState idAllocationState = new InMemoryIdAllocationState(); ReplicatedIdAllocationStateMachine stateMachine = - new ReplicatedIdAllocationStateMachine( me, idAllocationState ); + new ReplicatedIdAllocationStateMachine( me, idAllocationState, NullLogProvider.getInstance() ); // which has seen a replicated content at a specific index ReplicatedIdAllocationRequest mockRequest = mock( ReplicatedIdAllocationRequest.class ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java index 466d93174d63..264887526c29 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java @@ -92,7 +92,7 @@ private ReplicatedIdGenerator createForMemberWithInitialIdAndRangeLength( CoreMe int idRangeLength ) { ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( member, - new InMemoryIdAllocationState() ); + new InMemoryIdAllocationState(), NullLogProvider.getInstance() ); replicator.subscribe( idAllocationStateMachine ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolderTest.java index 89c82c0191e8..a2808fefb0ea 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenHolderTest.java @@ -45,6 +45,7 @@ import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.logging.NullLogProvider; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.Token; @@ -107,7 +108,7 @@ public void shouldCreateTokenId() throws Exception when( dependencies.resolveDependency( StorageEngine.class ) ).thenReturn( storageEngine ); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( replicator, - idGeneratorFactory, dependencies, TIMEOUT_MILLIS ); + idGeneratorFactory, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); tokenHolder.setLastCommittedIndex( -1 ); tokenHolder.start(); @@ -136,7 +137,7 @@ public void shouldTimeoutIfTokenDoesNotReplicateWithinTimeout() throws Exception when( dependencies.resolveDependency( StorageEngine.class ) ).thenReturn( storageEngine ); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( replicator, - idGeneratorFactory, dependencies, 10 ); + idGeneratorFactory, dependencies, 10, NullLogProvider.getInstance() ); tokenHolder.setLastCommittedIndex( -1 ); tokenHolder.start(); @@ -171,7 +172,7 @@ public void shouldStoreRaftLogIndexInTransactionHeader() throws Exception when( dependencies.resolveDependency( StorageEngine.class ) ).thenReturn( storageEngine ); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( - new StubReplicator(), idGeneratorFactory, dependencies, TIMEOUT_MILLIS ); + new StubReplicator(), idGeneratorFactory, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); tokenHolder.setLastCommittedIndex( -1 ); // when @@ -190,7 +191,7 @@ public void shouldStoreInitialTokens() throws Exception { // given ReplicatedTokenHolder tokenHolder = - new ReplicatedLabelTokenHolder( null, null, dependencies, TIMEOUT_MILLIS ); + new ReplicatedLabelTokenHolder( null, null, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); // when tokenHolder.setInitialTokens( asList( new Token( "name1", 1 ), new Token( "name2", 2 ) ) ); @@ -204,7 +205,7 @@ public void shouldThrowExceptionIfLastCommittedIndexNotSet() throws Exception { // given ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( null, - null, dependencies, TIMEOUT_MILLIS ); + null, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); // when try @@ -231,7 +232,7 @@ public void shouldGetExistingTokenIdFromAgesAgo() throws Exception Replicator replicator = new StubReplicator(); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( replicator, - idGeneratorFactory, dependencies, TIMEOUT_MILLIS ); + idGeneratorFactory, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); tokenHolder.setLastCommittedIndex( -1 ); tokenHolder.start(); @@ -261,7 +262,7 @@ public void shouldStoreAndReturnASingleTokenForTwoConcurrentRequests() throws Ex RaceConditionSimulatingReplicator replicator = new RaceConditionSimulatingReplicator(); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( replicator, - idGeneratorFactory, dependencies, TIMEOUT_MILLIS ); + idGeneratorFactory, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); tokenHolder.setLastCommittedIndex( -1 ); tokenHolder.start(); @@ -291,7 +292,7 @@ public void shouldStoreAndReturnASingleTokenForTwoDifferentConcurrentRequests() RaceConditionSimulatingReplicator replicator = new RaceConditionSimulatingReplicator(); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( replicator, - idGeneratorFactory, dependencies, TIMEOUT_MILLIS ); + idGeneratorFactory, dependencies, TIMEOUT_MILLIS, NullLogProvider.getInstance() ); tokenHolder.setLastCommittedIndex( -1 ); tokenHolder.start(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java index 715c534cde6e..2323ca9031cd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java @@ -61,23 +61,15 @@ public void shouldNotRejectUncommittedTransactionsAfterCrashEvenIfSessionTracker when( commitProcess.commit( any(), any(), any() ) ).thenThrow( new TransactionFailureException( "testing" ) ) .thenReturn( 123L ); - ReplicatedTransactionStateMachine rtsm = new ReplicatedTransactionStateMachine<>( - commitProcess, - new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), - mock( LockTokenManager.class, RETURNS_MOCKS ), - new CommittingTransactionsRegistry(), - new InMemoryGlobalSessionTrackerState<>(), NullLogProvider.getInstance() ); + ReplicatedTransactionStateMachine stateMachine = stateMachine( commitProcess, new InMemoryGlobalSessionTrackerState() ); - TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); - ReplicatedTransaction rtx = - ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, new GlobalSession<>( UUID - .randomUUID(), RaftTestMember.member( 2 ) ), new LocalOperationId( 1, 0 ) ); - rtsm.setLastCommittedIndex( 99 ); + ReplicatedTransaction rtx = replicatedTx(); + stateMachine.setLastCommittedIndex( 99 ); // when try { - rtsm.onReplicated( rtx, 100 ); + stateMachine.onReplicated( rtx, 100 ); fail( "test design throws exception here" ); } catch ( TransactionFailureException thrownByTestDesign ) @@ -85,8 +77,8 @@ public void shouldNotRejectUncommittedTransactionsAfterCrashEvenIfSessionTracker // expected } reset( commitProcess ); // ignore all previous interactions, we care what happens from now on - rtsm.setLastCommittedIndex( 99 ); - rtsm.onReplicated( rtx, 100 ); + stateMachine.setLastCommittedIndex( 99 ); + stateMachine.onReplicated( rtx, 100 ); // then verify( commitProcess, times( 1 ) ).commit( any(), any(), any() ); @@ -105,25 +97,16 @@ public void shouldUpdateSessionStateOnRecoveryEvenIfTxCommittedOnFirstTry() thro stubber.when( sessionTracker ).update( any(), any(), anyLong() ); stubber.doNothing().when( sessionTracker ).update( any(), any(), anyLong() ); - ReplicatedTransactionStateMachine rtsm = new ReplicatedTransactionStateMachine<>( - commitProcess, - new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), - mock( LockTokenManager.class, RETURNS_MOCKS ), - new CommittingTransactionsRegistry(), - sessionTracker, NullLogProvider.getInstance() ); + ReplicatedTransactionStateMachine stateMachine = stateMachine( commitProcess, sessionTracker ); - TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); - ReplicatedTransaction rtx = - ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, - new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 2 ) ), - new LocalOperationId( 1, 0 ) ); + ReplicatedTransaction rtx = replicatedTx(); // when // we try to commit but fail on session update, and then try to do recovery try { // transaction gets committed at log index 99. It will reach the tx log but not the session state - rtsm.onReplicated( rtx, 99 ); + stateMachine.onReplicated( rtx, 99 ); fail( "test setup should have resulted in an exception by now" ); } catch ( RuntimeException totallyExpectedByTestSetup ) @@ -134,11 +117,11 @@ public void shouldUpdateSessionStateOnRecoveryEvenIfTxCommittedOnFirstTry() thro reset( commitProcess ); // now let's do recovery. The log contains the last tx, so the last committed log index is the previous: 99 - rtsm.setLastCommittedIndex( 99 ); + stateMachine.setLastCommittedIndex( 99 ); // however, the raft log will give us the same tx, as we did not return successfully from the last // onReplicated() - rtsm.onReplicated( rtx, 99 ); + stateMachine.onReplicated( rtx, 99 ); // then // there should be no commit of tx, but an update on the session state @@ -154,32 +137,41 @@ public void shouldSkipUpdatingSessionStateForSameIndexAfterSuccessfulUpdate() th InMemoryGlobalSessionTrackerState sessionTrackerState = spy( new InMemoryGlobalSessionTrackerState<>() ); - ReplicatedTransactionStateMachine rtsm = new ReplicatedTransactionStateMachine<>( - commitProcess, - new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), - mock( LockTokenManager.class, RETURNS_MOCKS ), - new CommittingTransactionsRegistry(), - sessionTrackerState, NullLogProvider.getInstance() ); + ReplicatedTransactionStateMachine stateMachine = stateMachine( commitProcess, sessionTrackerState ); - TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); - ReplicatedTransaction rtx = - ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, - new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 2 ) ), - new LocalOperationId( 1, 0 ) ); + ReplicatedTransaction rtx = replicatedTx(); // when // we commit a tx normally final int commitAtRaftLogIndex = 99; - rtsm.onReplicated( rtx, commitAtRaftLogIndex ); + stateMachine.onReplicated( rtx, commitAtRaftLogIndex ); // simply verify that things were properly updated assertEquals( commitAtRaftLogIndex, sessionTrackerState.logIndex() ); // when the same replicated content is passed in again - rtsm.onReplicated( rtx, commitAtRaftLogIndex ); + stateMachine.onReplicated( rtx, commitAtRaftLogIndex ); // then verify( commitProcess, times( 1 ) ).commit( any(), any(), any() ); verify( sessionTrackerState, times( 1 ) ).update( any(), any(), eq( 99L ) ); } + + public ReplicatedTransactionStateMachine stateMachine( TransactionCommitProcess commitProcess, + GlobalSessionTrackerState sessionTrackerState ) + { + return new ReplicatedTransactionStateMachine<>( + commitProcess, + new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ), + mock( LockTokenManager.class, RETURNS_MOCKS ), + new CommittingTransactionsRegistry(), + sessionTrackerState, NullLogProvider.getInstance() ); + } + + private ReplicatedTransaction replicatedTx() throws java.io.IOException + { + TransactionRepresentation tx = new PhysicalTransactionRepresentation( Collections.emptySet() ); + return ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, new GlobalSession<>( UUID + .randomUUID(), RaftTestMember.member( 2 ) ), new LocalOperationId( 1, 0 ) ); + } } \ No newline at end of file