From 65223235af51a099a7e8cd652eee9aacfd2b5137 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 16 Mar 2017 20:03:03 +0100 Subject: [PATCH] handle timers better in the raft machine The election timer now takes the actual time of the last renewal into account when deciding on whether the timeout in fact has elapsed since the last renewing event. This would otherwise cause undesired behaviour when an election timeout event is scheduled behind events in the message queue which renew the timer but fail to prevent any already scheduled event. The high level symptom is unnecessary leader re-elections. The timer service is a little bit badly designed, but this is a reasonable workaround. The timers are now also not started until the recovery of core state is complete, since the raft message queue is blocked during this time, creating one of the possible situations in which the above described queueing can happen on startup. For the sake of the tests, the controlled timer service is injected with a fake clock which always moves forward by the amount of the invoked timer so that the election timeout in fact spawns an event as an effect of the invocation. The alternative would have been to control the fake clock in every test. The timer code is also cleaned up a bit and not even started in case we are in follower-only mode, since the timers are only useful for candidates and leaders. --- .../core/consensus/ConsensusModule.java | 2 +- .../core/consensus/RaftMachine.java | 88 ++++---- .../core/state/CoreState.java | 2 + .../catchup/tx/CatchupPollingProcessTest.java | 8 +- .../core/consensus/RaftMachineBuilder.java | 11 +- .../core/consensus/RaftMachineTest.java | 190 ++++++------------ .../core/consensus/RaftTestFixture.java | 7 +- .../core/consensus/election/Fixture.java | 1 + .../core/consensus/roles/ElectionTest.java | 19 +- .../ControlledRenewableTimeoutService.java | 79 +++++++- 10 files changed, 215 insertions(+), 192 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java index bbd425f00cea1..f058e54239d18 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java @@ -135,7 +135,7 @@ expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_t raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval, raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightMap, - config.get( CausalClusteringSettings.refuse_to_be_leader ), platformModule.monitors ); + config.get( CausalClusteringSettings.refuse_to_be_leader ), platformModule.monitors, systemClock() ); life.add( new RaftDiscoveryServiceConnector( coreTopologyService, raftMachine ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java index e495bcc600960..b7ce502218ec0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java @@ -20,13 +20,13 @@ package org.neo4j.causalclustering.core.consensus; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; -import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap; @@ -35,6 +35,7 @@ import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutHandler; import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShippingManager; import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; import org.neo4j.causalclustering.core.consensus.state.RaftState; @@ -45,6 +46,7 @@ import org.neo4j.causalclustering.helper.VolatileFuture; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.Outbound; +import org.neo4j.function.ThrowingAction; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; @@ -77,8 +79,10 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private RenewableTimeoutService.RenewableTimeout electionTimer; private RaftMembershipManager membershipManager; private final boolean refuseToBecomeLeader; + private final Clock clock; private final long electionTimeout; + private long lastElectionRenewalMillis; private final VolatileFuture volatileLeader = new VolatileFuture<>( null ); @@ -92,7 +96,7 @@ public RaftMachine( MemberId myself, StateStorage termStorage, StateS RaftLog entryLog, long electionTimeout, long heartbeatInterval, RenewableTimeoutService renewableTimeoutService, Outbound outbound, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, - InFlightMap inFlightMap, boolean refuseToBecomeLeader, Monitors monitors ) + InFlightMap inFlightMap, boolean refuseToBecomeLeader, Monitors monitors, Clock clock ) { this.myself = myself; this.electionTimeout = electionTimeout; @@ -106,42 +110,60 @@ public RaftMachine( MemberId myself, StateStorage termStorage, StateS this.membershipManager = membershipManager; this.refuseToBecomeLeader = refuseToBecomeLeader; + this.clock = clock; this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap, logProvider ); leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); + } - initTimers(); - } - - private void initTimers() - { - electionTimer = - renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> - { - try - { - triggerElection(); - } - catch ( IOException e ) - { - log.error( "Failed to process election timeout.", e ); - } - timeout.renew(); - } ); - heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, timeout -> + public synchronized void startTimers() + { + if ( !refuseToBecomeLeader ) + { + lastElectionRenewalMillis = clock.millis(); + electionTimer = renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), + renewing( this::electionTimeout ) ); + heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, + renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) ); + } + } + + public synchronized void stopTimers() + { + if ( electionTimer != null ) + { + electionTimer.cancel(); + } + if ( heartbeatTimer != null ) + { + heartbeatTimer.cancel(); + } + } + + private TimeoutHandler renewing( ThrowingAction action ) + { + return timeout -> { try { - handle( new RaftMessages.Timeout.Heartbeat( myself ) ); + action.apply(); } - catch ( IOException e ) + catch ( Exception e ) { - log.error( "Failed to process heartbeat timeout.", e ); + log.error( "Failed to process timeout.", e ); } timeout.renew(); - } ); + }; + } + + private synchronized void electionTimeout() throws IOException + { + if ( clock.millis() - lastElectionRenewalMillis >= electionTimeout ) + { + triggerElection(); + } } public void triggerElection() throws IOException @@ -150,19 +172,11 @@ public void triggerElection() throws IOException { handle( new RaftMessages.Timeout.Election( myself ) ); } - else - { - log.info( - format( "Election timeout occured, but {%s} is configured to not tirgger an election. " + - "See setting: %s", - myself, CausalClusteringSettings.refuse_to_be_leader.name() ) ); - } } public void panic() { - heartbeatTimer.cancel(); - electionTimer.cancel(); + stopTimers(); } public synchronized RaftCoreState coreState() @@ -316,7 +330,11 @@ private void handleTimers( Outcome outcome ) { if ( outcome.electionTimeoutRenewed() ) { - electionTimer.renew(); + lastElectionRenewalMillis = clock.millis(); + if ( electionTimer != null ) + { + electionTimer.renew(); + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreState.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreState.java index 9b1e0f881e8a8..9f803b1e98ade 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreState.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreState.java @@ -186,6 +186,7 @@ public synchronized void start() throws Throwable localDatabase.start(); coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() ); applicationProcess.start(); + raftMachine.startTimers(); } private boolean haveState() @@ -198,6 +199,7 @@ private boolean haveState() @Override public synchronized void stop() throws Throwable { + raftMachine.stopTimers(); applicationProcess.stop(); localDatabase.stop(); allowMessageHandling = false; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java index fb26ec4116343..6b6f85c907817 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java @@ -23,14 +23,13 @@ import org.junit.Test; import java.util.concurrent.CompletableFuture; - import java.util.concurrent.Future; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpResponseCallback; import org.neo4j.causalclustering.catchup.CatchupResult; -import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; +import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; @@ -48,7 +47,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -139,7 +137,7 @@ public void shouldRenewTxPullTimeoutOnSuccessfulTxPulling() throws Throwable timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew(); + assertEquals( 1, timeoutService.getTimeout( TX_PULLER_TIMEOUT ).renewalCount() ); } @Test @@ -199,7 +197,7 @@ public void shouldNotRenewTheTimeoutIfInPanicState() throws Throwable // then assertEquals( PANIC, txPuller.state() ); - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), never() ).renew(); + assertEquals( 0, timeoutService.getTimeout( TX_PULLER_TIMEOUT ).renewalCount() ); } @Test diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java index 2b3ed910ea46c..fa9fff2dac835 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java @@ -66,6 +66,7 @@ public class RaftMachineBuilder private LogProvider logProvider = NullLogProvider.getInstance(); private Clock clock = Clocks.systemClock(); + private Clock shippingClock = Clocks.systemClock(); private long electionTimeout = 500; private long heartbeatInterval = 150; @@ -95,11 +96,11 @@ public RaftMachine build() raftMembership ); membershipManager.setRecoverFromIndexSupplier( () -> 0 ); RaftLogShippingManager logShipping = - new RaftLogShippingManager( outbound, logProvider, raftLog, clock, member, membershipManager, + new RaftLogShippingManager( outbound, logProvider, raftLog, shippingClock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightMap ); RaftMachine raft = new RaftMachine( member, termState, voteState, raftLog, electionTimeout, heartbeatInterval, renewableTimeoutService, outbound, logProvider, - membershipManager, logShipping, inFlightMap, false, monitors ); + membershipManager, logShipping, inFlightMap, false, monitors, clock ); inbound.registerHandler( ( incomingMessage ) -> { try { @@ -160,6 +161,12 @@ public RaftMachineBuilder raftLog( RaftLog raftLog ) return this; } + public RaftMachineBuilder clock( Clock clock ) + { + this.clock = clock; + return this; + } + public RaftMachineBuilder commitListener( CommitListener commitListener ) { this.commitListener = commitListener; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java index 02e50a3830653..a69cd5c2d1397 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java @@ -19,14 +19,10 @@ */ package org.neo4j.causalclustering.core.consensus; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - import org.junit.Test; import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLog; -import org.neo4j.causalclustering.core.consensus.log.RaftLogCursor; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; import org.neo4j.causalclustering.core.consensus.membership.MembershipEntry; @@ -35,14 +31,10 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.RaftTestMemberSetBuilder; import org.neo4j.causalclustering.messaging.Inbound; -import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; -import org.neo4j.kernel.impl.util.Listener; -import org.neo4j.kernel.internal.DatabaseHealth; -import org.neo4j.kernel.internal.KernelEventHandlers; import org.neo4j.kernel.monitoring.Monitors; -import org.neo4j.logging.NullLog; +import org.neo4j.time.Clocks; +import org.neo4j.time.FakeClock; -import static junit.framework.TestCase.assertFalse; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.instanceOf; @@ -51,7 +43,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import static org.neo4j.causalclustering.core.consensus.RaftMachine.Timeouts.ELECTION; import static org.neo4j.causalclustering.core.consensus.TestMessageBuilders.appendEntriesRequest; import static org.neo4j.causalclustering.core.consensus.TestMessageBuilders.voteRequest; @@ -91,15 +82,18 @@ public void shouldAlwaysStartAsFollower() throws Exception public void shouldRequestVotesOnElectionTimeout() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); OutboundMessageCollector messages = new OutboundMessageCollector(); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) + .clock( fakeClock ) .outbound( messages ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); // When timeouts.invokeTimeout( ELECTION ); @@ -118,11 +112,13 @@ public void shouldRequestVotesOnElectionTimeout() throws Exception public void shouldBecomeLeaderInMajorityOf3() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); assertThat( raft.isLeader(), is( false ) ); @@ -138,12 +134,14 @@ public void shouldBecomeLeaderInMajorityOf3() throws Exception public void shouldBecomeLeaderInMajorityOf5() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2, member3, member4 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); @@ -161,12 +159,14 @@ public void shouldBecomeLeaderInMajorityOf5() throws Exception public void shouldNotBecomeLeaderOnMultipleVotesFromSameMember() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2, member3, member4 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); @@ -182,11 +182,13 @@ public void shouldNotBecomeLeaderOnMultipleVotesFromSameMember() throws Exceptio public void shouldNotBecomeLeaderWhenVotingOnItself() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); @@ -201,11 +203,13 @@ public void shouldNotBecomeLeaderWhenVotingOnItself() throws Exception public void shouldNotBecomeLeaderWhenMembersVoteNo() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); @@ -221,11 +225,13 @@ public void shouldNotBecomeLeaderWhenMembersVoteNo() throws Exception public void shouldNotBecomeLeaderByVotesFromOldTerm() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); // When @@ -240,19 +246,22 @@ public void shouldNotBecomeLeaderByVotesFromOldTerm() throws Exception public void shouldVoteFalseForCandidateInOldTerm() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); OutboundMessageCollector messages = new OutboundMessageCollector(); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) + .clock( fakeClock ) .outbound( messages ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); // When - raft.handle( voteRequest().from( member1 ).term( -1 ).candidate( member1 ).lastLogIndex( 0 ).lastLogTerm( -1 - ).build() ); + raft.handle( voteRequest().from( member1 ).term( -1 ).candidate( member1 ) + .lastLogIndex( 0 ).lastLogTerm( -1 ).build() ); // Then assertThat( messages.sentTo( member1 ).size(), equalTo( 1 ) ); @@ -263,11 +272,13 @@ public void shouldVoteFalseForCandidateInOldTerm() throws Exception public void shouldNotBecomeLeaderByVotesFromFutureTerm() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); @@ -283,17 +294,20 @@ public void shouldNotBecomeLeaderByVotesFromFutureTerm() throws Exception public void shouldAppendNewLeaderBarrierAfterBecomingLeader() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); OutboundMessageCollector messages = new OutboundMessageCollector(); InMemoryRaftLog raftLog = new InMemoryRaftLog(); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) + .clock( fakeClock ) .outbound( messages ) .raftLog( raftLog ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); // When timeouts.invokeTimeout( ELECTION ); @@ -307,15 +321,18 @@ public void shouldAppendNewLeaderBarrierAfterBecomingLeader() throws Exception public void leaderShouldSendHeartBeatsOnHeartbeatTimeout() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); OutboundMessageCollector messages = new OutboundMessageCollector(); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) .outbound( messages ) + .clock( fakeClock ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( ELECTION ); raft.handle( voteResponse().from( member1 ).term( 1 ).grant().build() ); @@ -332,12 +349,14 @@ public void leaderShouldSendHeartBeatsOnHeartbeatTimeout() throws Exception public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() throws Exception { // Given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).build(); + .timeoutService( timeouts ).clock( fakeClock ).build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); try { @@ -357,9 +376,11 @@ public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() thr public void shouldPersistAtSpecifiedLogIndex() throws Exception { // given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) + .clock( fakeClock ) .raftLog( raftLog ) .build(); @@ -391,14 +412,16 @@ public void handle( RaftMessages.RaftMessage message ) } } ); - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); - + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) .outbound( messages ) + .clock( fakeClock ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); // We make ourselves the leader timeouts.invokeTimeout( ELECTION ); @@ -447,101 +470,6 @@ public void shouldMonitorLeaderNotFound() throws Exception } } - private static class ExplodingRaftLog implements RaftLog - { - private boolean startExploding = false; - - @Override - public long append( RaftLogEntry... entries ) throws IOException - { - if ( startExploding ) - { - throw new IOException( "Boom! append" ); - } - else - { - return 0; - } - } - - @Override - public void truncate( long fromIndex ) throws IOException - { - throw new IOException( "Boom! truncate" ); - } - - @Override - public long prune( long safeIndex ) - { - return -1; - } - - @Override - public long appendIndex() - { - return -1; - } - - @Override - public long prevIndex() - { - return -1; - } - - @Override - public long readEntryTerm( long logIndex ) throws IOException - { - return -1; - } - - @Override - public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException - { - if ( startExploding ) - { - throw new IOException( "Boom! entry cursor" ); - } - else - { - return RaftLogCursor.empty(); - } - } - - @Override - public long skip( long index, long term ) - { - return -1; - } - - public void startExploding() - { - startExploding = true; - } - } - - private static class TestDatabaseHealth extends DatabaseHealth - { - - private boolean hasPanicked = false; - - public TestDatabaseHealth() - { - super( new DatabasePanicEventGenerator( new KernelEventHandlers( NullLog.getInstance() ) ), - NullLog.getInstance() ); - } - - @Override - public void panic( Throwable cause ) - { - this.hasPanicked = true; - } - - public boolean hasPanicked() - { - return hasPanicked; - } - } - private class StubLeaderNotFoundMonitor implements LeaderNotFoundMonitor { long count = 0; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java index 9a07d42ee2c7a..c709c9a1b5aaf 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftTestFixture.java @@ -40,6 +40,8 @@ import org.neo4j.causalclustering.messaging.Inbound; import org.neo4j.causalclustering.messaging.LoggingOutbound; import org.neo4j.causalclustering.messaging.Outbound; +import org.neo4j.time.Clocks; +import org.neo4j.time.FakeClock; import static java.lang.String.format; @@ -55,7 +57,8 @@ public RaftTestFixture( DirectNetworking net, int expectedClusterSize, MemberId. { MemberFixture fixtureMember = new MemberFixture(); - fixtureMember.timeoutService = new ControlledRenewableTimeoutService(); + FakeClock clock = Clocks.fakeClock(); + fixtureMember.timeoutService = new ControlledRenewableTimeoutService( clock ); fixtureMember.raftLog = new InMemoryRaftLog(); fixtureMember.member = id; @@ -69,6 +72,7 @@ public RaftTestFixture( DirectNetworking net, int expectedClusterSize, MemberId. .inbound( inbound ) .outbound( outbound ) .raftLog( fixtureMember.raftLog ) + .clock( clock ) .timeoutService( fixtureMember.timeoutService ) .build(); @@ -87,6 +91,7 @@ public void bootstrap( MemberId[] members ) throws RaftMachine.BootstrapExceptio { member.raftLog().append( new RaftLogEntry(0, new MemberIdSet(asSet( members ))) ); member.raftInstance().installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( members )) ) ); + member.raftInstance().startTimers(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java index ec7b646ebfe90..0b21cb411a9d6 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/Fixture.java @@ -107,6 +107,7 @@ void boot() throws BootstrapException, TimeoutException, InterruptedException, I { raft.raftLog().append( new RaftLogEntry(0, new MemberIdSet(asSet( members ))) ); raft.raftMachine().installCoreState( new RaftCoreState( new MembershipEntry( 0, members ) ) ); + raft.raftMachine.startTimers(); } net.start(); awaitBootstrapped(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java index c26cef3160dcf..38d558c2a2cd3 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/ElectionTest.java @@ -34,6 +34,8 @@ import org.neo4j.causalclustering.identity.RaftTestMemberSetBuilder; import org.neo4j.causalclustering.messaging.Inbound; import org.neo4j.causalclustering.messaging.Outbound; +import org.neo4j.time.Clocks; +import org.neo4j.time.FakeClock; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; @@ -68,14 +70,16 @@ public class ElectionTest public void candidateShouldWinElectionAndBecomeLeader() throws Exception { // given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); - + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .outbound( outbound ) .timeoutService( timeouts ) + .clock( fakeClock ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); + raft.startTimers(); timeouts.invokeTimeout( RaftMachine.Timeouts.ELECTION ); @@ -98,15 +102,17 @@ public void candidateShouldLoseElectionAndRemainCandidate() throws Exception // remain as a candidate // given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); - + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .outbound( outbound ) .timeoutService( timeouts ) + .clock( fakeClock ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) )); + raft.startTimers(); timeouts.invokeTimeout( RaftMachine.Timeouts.ELECTION ); @@ -126,11 +132,12 @@ public void candidateShouldLoseElectionAndRemainCandidate() throws Exception public void candidateShouldVoteForTheSameCandidateInTheSameTerm() throws Exception { // given - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); - + FakeClock fakeClock = Clocks.fakeClock(); + ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService( fakeClock ); RaftMachine raft = new RaftMachineBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .outbound( outbound ) .timeoutService( timeouts ) + .clock( fakeClock ) .build(); raft.installCoreState( new RaftCoreState( new MembershipEntry( 0, asSet( myself, member1, member2 ) ) ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/ControlledRenewableTimeoutService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/ControlledRenewableTimeoutService.java index 4b2ba4c27b5ba..459ad8590bb75 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/ControlledRenewableTimeoutService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/ControlledRenewableTimeoutService.java @@ -22,30 +22,87 @@ import java.util.HashMap; import java.util.Map; -import org.neo4j.helpers.collection.Pair; +import org.neo4j.time.Clocks; +import org.neo4j.time.FakeClock; -import static org.mockito.Mockito.mock; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class ControlledRenewableTimeoutService implements RenewableTimeoutService { - private Map> handlers = new HashMap<>(); + public static class Timer implements RenewableTimeout + { + TimeoutHandler callback; + long delayMillis; + boolean enabled; + long renewalCount; + + Timer( TimeoutHandler callback, long delayMillis ) + { + this.callback = callback; + this.delayMillis = delayMillis; + enabled = true; + } + + @Override + public void renew() + { + enabled = true; + renewalCount++; + } + + public long renewalCount() + { + return renewalCount; + } + + @Override + public void cancel() + { + enabled = false; + } + } + + private Map timers = new HashMap<>(); + private final FakeClock clock; + + public ControlledRenewableTimeoutService() + { + this( Clocks.fakeClock() ); + } + + public ControlledRenewableTimeoutService( FakeClock clock ) + { + this.clock = clock; + } @Override - public RenewableTimeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler ) + public RenewableTimeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler callback ) { - RenewableTimeout timeout = mock( RenewableTimeout.class ); - handlers.put( name, Pair.of( handler, timeout ) ); - return timeout; + Timer timer = new Timer( callback, delayInMillis ); + timers.put( name, timer ); + return timer; } public void invokeTimeout( TimeoutName name ) { - Pair pair = handlers.get( name ); - pair.first().onTimeout( pair.other() ); + Timer timer = timers.get( name ); + if ( timer == null ) + { + /* not registered */ + return; + } + /* invoking a certain timer moves time forward the same amount */ + clock.forward( timer.delayMillis, MILLISECONDS ); + if ( !timer.enabled ) + { + throw new IllegalStateException( "Invoked timer which is not enabled" ); + } + timer.cancel(); + timer.callback.onTimeout( timer ); } - public RenewableTimeout getTimeout( TimeoutName name ) + public Timer getTimeout( TimeoutName name ) { - return handlers.get( name ).other(); + return timers.get( name ); } }