diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java index b7ce502218ec0..86df069b9146a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java @@ -63,6 +63,7 @@ public class RaftMachine implements LeaderLocator, CoreMetaData { private final LeaderNotFoundMonitor leaderNotFoundMonitor; + private final InFlightMap inFlightMap; private RenewableTimeoutService.RenewableTimeout heartbeatTimer; public enum Timeouts implements RenewableTimeoutService.TimeoutName @@ -112,13 +113,19 @@ public RaftMachine( MemberId myself, StateStorage termStorage, StateS this.refuseToBecomeLeader = refuseToBecomeLeader; this.clock = clock; + this.inFlightMap = inFlightMap; this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap, logProvider ); leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); } - public synchronized void startTimers() + /** + * This should be called after the major recovery operations are complete. Before this is called + * this instance cannot become a leader (the timers are disabled) and entries will not be cached + * in the in-flight map, because the application process is not running and ready to consume them. + */ + public synchronized void postRecoveryActions() { if ( !refuseToBecomeLeader ) { @@ -128,6 +135,8 @@ public synchronized void startTimers() heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) ); } + + inFlightMap.enable(); } public synchronized void stopTimers() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightMap.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightMap.java index 09efeb2248fdc..ca4023a3bbc49 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightMap.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightMap.java @@ -27,6 +27,22 @@ public class InFlightMap { private final SortedMap map = new ConcurrentSkipListMap<>(); + private volatile boolean enabled; + + public InFlightMap() + { + this ( false ); + } + + public InFlightMap( boolean enabled ) + { + this.enabled = enabled; + } + + public void enable() + { + this.enabled = true; + } /** * Adds a new mapping. @@ -37,6 +53,11 @@ public class InFlightMap */ public void put( Long key, V value ) { + if ( !enabled ) + { + return; + } + V previousValue = map.putIfAbsent( key, value ); if ( previousValue != null ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java index f7d6668f2931c..5ff8c491c1a38 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java @@ -81,7 +81,7 @@ public synchronized void start() throws Throwable localDatabase.start(); coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() ); applicationProcess.start(); - raftMachine.startTimers(); + raftMachine.postRecoveryActions(); } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java index 6bd3602c95ca1..00799169eded6 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java @@ -78,14 +78,13 @@ public class RaftMachineBuilder new InMemoryStateStorage<>( new RaftMembershipState() ); private Monitors monitors = new Monitors(); private CommitListener commitListener = commitIndex -> {}; - private final InFlightMap inFlightMap; + private InFlightMap inFlightMap = new InFlightMap<>(); public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder ) { this.member = member; this.expectedClusterSize = expectedClusterSize; this.memberSetBuilder = memberSetBuilder; - inFlightMap = new InFlightMap<>(); } public RaftMachine build() @@ -162,6 +161,12 @@ public RaftMachineBuilder raftLog( RaftLog raftLog ) return this; } + public RaftMachineBuilder inFlightMap( InFlightMap inFlightMap ) + { + this.inFlightMap = inFlightMap; + return this; + } + public RaftMachineBuilder clock( Clock clock ) { this.clock = clock; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java index 7522d56fc37e6..f97fb3ebe5e9f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java @@ -27,6 +27,7 @@ import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLogCursor; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; +import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap; import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; import org.neo4j.causalclustering.core.consensus.membership.MembershipEntry; import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService; @@ -44,6 +45,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.causalclustering.core.consensus.RaftMachine.Timeouts.ELECTION; @@ -67,6 +69,7 @@ public class RaftMachineTest private MemberId member4 = member( 4 ); private ReplicatedInteger data1 = ReplicatedInteger.valueOf( 1 ); + private ReplicatedInteger data2 = ReplicatedInteger.valueOf( 2 ); private RaftLog raftLog = new InMemoryRaftLog(); @@ -96,7 +99,7 @@ public void shouldRequestVotesOnElectionTimeout() throws Exception .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); // When timeouts.invokeTimeout( ELECTION ); @@ -121,7 +124,7 @@ public void shouldBecomeLeaderInMajorityOf3() throws Exception .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); assertThat( raft.isLeader(), is( false ) ); @@ -144,7 +147,7 @@ public void shouldBecomeLeaderInMajorityOf5() throws Exception raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2, member3, member4 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); @@ -169,7 +172,7 @@ public void shouldNotBecomeLeaderOnMultipleVotesFromSameMember() throws Exceptio raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2, member3, member4 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); @@ -191,7 +194,7 @@ public void shouldNotBecomeLeaderWhenVotingOnItself() throws Exception .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); @@ -212,7 +215,7 @@ public void shouldNotBecomeLeaderWhenMembersVoteNo() throws Exception .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); @@ -234,7 +237,7 @@ public void shouldNotBecomeLeaderByVotesFromOldTerm() throws Exception .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); // When @@ -260,7 +263,7 @@ public void shouldVoteFalseForCandidateInOldTerm() throws Exception .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); // When raft.handle( voteRequest().from( member1 ).term( -1 ).candidate( member1 ) @@ -281,7 +284,7 @@ public void shouldNotBecomeLeaderByVotesFromFutureTerm() throws Exception .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); @@ -310,7 +313,7 @@ public void shouldAppendNewLeaderBarrierAfterBecomingLeader() throws Exception .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); // When timeouts.invokeTimeout( ELECTION ); @@ -335,7 +338,7 @@ public void leaderShouldSendHeartBeatsOnHeartbeatTimeout() throws Exception .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( ELECTION ); raft.handle( voteResponse().from( member1 ).term( 1 ).grant().build() ); @@ -359,7 +362,7 @@ public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() thr .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); try { @@ -424,7 +427,7 @@ public void handle( RaftMessages.RaftMessage message ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); // We make ourselves the leader timeouts.invokeTimeout( ELECTION ); @@ -473,6 +476,40 @@ public void shouldMonitorLeaderNotFound() throws Exception } } + @Test + public void shouldNotCacheInFlightEntriesUntilAfterRecovery() throws Exception + { + // given + FakeClock fakeClock = Clocks.fakeClock(); + InFlightMap inFlightMap = new InFlightMap<>(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); + RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) + .timeoutService( timeouts ) + .clock( fakeClock ) + .raftLog( raftLog ) + .inFlightMap( inFlightMap ) + .build(); + + raftLog.append( new RaftLogEntry(0, new MemberIdSet(asSet( myself, member1, member2 ))) ); + + // when + raft.handle( appendEntriesRequest().from( member1 ).prevLogIndex( 0 ).prevLogTerm( 0 ).leaderTerm( 0 ) + .logEntry( new RaftLogEntry( 0, data1 ) ).build() ); + + // then + assertEquals( data1, readLogEntry( raftLog, 1 ).content() ); + assertNull( inFlightMap.get( 1L ) ); + + // when + raft.postRecoveryActions(); + raft.handle( appendEntriesRequest().from( member1 ).prevLogIndex( 1 ).prevLogTerm( 0 ).leaderTerm( 0 ) + .logEntry( new RaftLogEntry( 0, data2 ) ).build() ); + + // then + assertEquals( data2, readLogEntry( raftLog, 2 ).content() ); + assertEquals( data2, inFlightMap.get( 2L ).content() ); + } + private static class ExplodingRaftLog implements RaftLog { private boolean startExploding = false; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java index c709c9a1b5aaf..23d3056ddbf30 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java @@ -91,7 +91,7 @@ public void bootstrap( MemberId[] members ) throws RaftMachine.BootstrapExceptio { member.raftLog().append( new RaftLogEntry(0, new MemberIdSet(asSet( members ))) ); member.raftInstance().installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( members )) ) ); - member.raftInstance().startTimers(); + member.raftInstance().postRecoveryActions(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java index 96e000a926f91..513b2ecfed283 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java @@ -106,7 +106,7 @@ void boot() throws BootstrapException, TimeoutException, InterruptedException, I { raft.raftLog().append( new RaftLogEntry(0, new MemberIdSet(asSet( members ))) ); raft.raftMachine().installCoreState( new RaftCoreState( new MembershipEntry( 0, members ) ) ); - raft.raftMachine.startTimers(); + raft.raftMachine.postRecoveryActions(); } net.start(); awaitBootstrapped(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java index 1e323031b8544..552af1e88366e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java @@ -25,16 +25,30 @@ import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; public class InFlightLogEntriesCacheTest { + @Test + public void shouldNotCacheUntilEnabled() throws Exception + { + InFlightMap cache = new InFlightMap<>(); + Object entry = new Object(); + + cache.put( 1L, entry ); + assertNull( cache.get( 1L ) ); + + cache.enable(); + cache.put( 1L, entry ); + assertEquals( entry, cache.get( 1L ) ); + } + @Test public void shouldRegisterAndUnregisterValues() throws Exception { InFlightMap entries = new InFlightMap<>(); + entries.enable(); Map logEntryList = new HashMap<>(); logEntryList.put(1L, new Object() ); @@ -65,6 +79,7 @@ public void shouldRegisterAndUnregisterValues() throws Exception public void shouldNotReinsertValues() throws Exception { InFlightMap entries = new InFlightMap<>(); + entries.enable(); Object addedObject = new Object(); entries.put( 1L, addedObject ); entries.put( 1L, addedObject ); @@ -74,6 +89,7 @@ public void shouldNotReinsertValues() throws Exception public void shouldNotReplaceRegisteredValues() throws Exception { InFlightMap cache = new InFlightMap<>(); + cache.enable(); Object first = new Object(); Object second = new Object(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/BatchAppendLogEntriesTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/BatchAppendLogEntriesTest.java index e6f518e4c989b..dd7fb42efc7b0 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/BatchAppendLogEntriesTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/BatchAppendLogEntriesTest.java @@ -90,7 +90,7 @@ public void applyTo() throws Exception BatchAppendLogEntries batchAppend = new BatchAppendLogEntries( baseIndex, offset, entries ); - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>( true ); //when batchAppend.applyTo( cache, log ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/TruncateLogCommandTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/TruncateLogCommandTest.java index 0058c69fbe0bd..67eedeb8fe74f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/TruncateLogCommandTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/outcome/TruncateLogCommandTest.java @@ -44,7 +44,7 @@ public void applyTo() throws Exception Log log = logProvider.getLog( getClass() ); long fromIndex = 2L; TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex ); - InFlightMap inFlightMap = new InFlightMap<>(); + InFlightMap inFlightMap = new InFlightMap<>( true ); inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); inFlightMap.put( 1L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); @@ -71,7 +71,7 @@ public void shouldTruncateWithGaps() throws Exception long fromIndex = 1L; TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex ); - InFlightMap inFlightMap = new InFlightMap<>(); + InFlightMap inFlightMap = new InFlightMap<>( true ); inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); inFlightMap.put( 2L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java index 38d558c2a2cd3..43597e7e845a2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java @@ -79,7 +79,7 @@ public void candidateShouldWinElectionAndBecomeLeader() throws Exception .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( RaftMachine.Timeouts.ELECTION ); @@ -112,7 +112,7 @@ public void candidateShouldLoseElectionAndRemainCandidate() throws Exception raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) )); - raft.startTimers(); + raft.postRecoveryActions(); timeouts.invokeTimeout( RaftMachine.Timeouts.ELECTION ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java index e854302658b43..28b848f7c26b4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java @@ -62,7 +62,7 @@ public void shouldUpdateCacheState() throws Exception //Test that updates applied to the raft state will be refelcted in the entry cache. //given - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>( true ); RaftState raftState = new RaftState( member( 0 ), new InMemoryStateStorage<>( new TermState() ), new FakeMembership(), new InMemoryRaftLog(), new InMemoryStateStorage<>( new VoteState() ), cache, NullLogProvider.getInstance() ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/CommandApplicationProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/CommandApplicationProcessTest.java index 0e4834b41e9f0..1ec3bfa8eb8d4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/CommandApplicationProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/CommandApplicationProcessTest.java @@ -81,7 +81,7 @@ public class CommandApplicationProcessTest private final int flushEvery = 10; private final int batchSize = 16; - private InFlightMap inFlightMap = spy( new InFlightMap<>() ); + private InFlightMap inFlightMap = spy( new InFlightMap<>( true ) ); private final Monitors monitors = new Monitors(); private CoreState coreState = mock( CoreState.class ); private final CommandApplicationProcess applicationProcess = new CommandApplicationProcess(