Skip to content

Commit

Permalink
handle timers better in the raft machine
Browse files Browse the repository at this point in the history
The election timer now takes the actual time of the last renewal
into account when deciding on whether the timeout in fact
has elapsed since the last renewing event. This would otherwise
cause undesired behaviour when an election timeout event is
scheduled behind events in the message queue which renew the timer but
fail to prevent any already scheduled event. The high level symptom
is unnecessary leader re-elections. The timer service is a little
bit badly designed, but this is a reasonable workaround.

The timers are now also not started until the recovery of core state
is complete, since the raft message queue is blocked during this time,
creating one of the possible situations in which the above described
queueing can happen on startup.

For the sake of the tests, the controlled timer service is injected
with a fake clock which always moves forward by the amount of the
invoked timer so that the election timeout in fact spawns an event
as an effect of the invocation. The alternative would have been
to control the fake clock in every test.

The timer code is also cleaned up a bit and not even started in case
we are in follower-only mode, since the timers are only useful for
candidates and leaders.
  • Loading branch information
martinfurmanski committed Mar 16, 2017
1 parent bb5caed commit 6522323
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 192 deletions.
Expand Up @@ -135,7 +135,7 @@ expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_t


raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval, raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval,
raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightMap, raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightMap,
config.get( CausalClusteringSettings.refuse_to_be_leader ), platformModule.monitors ); config.get( CausalClusteringSettings.refuse_to_be_leader ), platformModule.monitors, systemClock() );


life.add( new RaftDiscoveryServiceConnector( coreTopologyService, raftMachine ) ); life.add( new RaftDiscoveryServiceConnector( coreTopologyService, raftMachine ) );


Expand Down
Expand Up @@ -20,13 +20,13 @@
package org.neo4j.causalclustering.core.consensus; package org.neo4j.causalclustering.core.consensus;


import java.io.IOException; import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Predicate; import java.util.function.Predicate;


import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap; import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
Expand All @@ -35,6 +35,7 @@
import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutHandler;
import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShippingManager; import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShippingManager;
import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState;
import org.neo4j.causalclustering.core.consensus.state.RaftState; import org.neo4j.causalclustering.core.consensus.state.RaftState;
Expand All @@ -45,6 +46,7 @@
import org.neo4j.causalclustering.helper.VolatileFuture; import org.neo4j.causalclustering.helper.VolatileFuture;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.function.ThrowingAction;
import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand Down Expand Up @@ -77,8 +79,10 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RenewableTimeoutService.RenewableTimeout electionTimer; private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager membershipManager; private RaftMembershipManager membershipManager;
private final boolean refuseToBecomeLeader; private final boolean refuseToBecomeLeader;
private final Clock clock;


private final long electionTimeout; private final long electionTimeout;
private long lastElectionRenewalMillis;


private final VolatileFuture<MemberId> volatileLeader = new VolatileFuture<>( null ); private final VolatileFuture<MemberId> volatileLeader = new VolatileFuture<>( null );


Expand All @@ -92,7 +96,7 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS
RaftLog entryLog, long electionTimeout, long heartbeatInterval, RaftLog entryLog, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService, Outbound<MemberId,RaftMessages.RaftMessage> outbound, RenewableTimeoutService renewableTimeoutService, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping,
InFlightMap<RaftLogEntry> inFlightMap, boolean refuseToBecomeLeader, Monitors monitors ) InFlightMap<RaftLogEntry> inFlightMap, boolean refuseToBecomeLeader, Monitors monitors, Clock clock )
{ {
this.myself = myself; this.myself = myself;
this.electionTimeout = electionTimeout; this.electionTimeout = electionTimeout;
Expand All @@ -106,42 +110,60 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS


this.membershipManager = membershipManager; this.membershipManager = membershipManager;
this.refuseToBecomeLeader = refuseToBecomeLeader; this.refuseToBecomeLeader = refuseToBecomeLeader;
this.clock = clock;


this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap, this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap,
logProvider ); logProvider );


leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );
}


initTimers(); public synchronized void startTimers()
} {

if ( !refuseToBecomeLeader )
private void initTimers() {
{ lastElectionRenewalMillis = clock.millis();
electionTimer = electionTimer = renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(),
renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> renewing( this::electionTimeout ) );
{ heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0,
try renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) );
{ }
triggerElection(); }
}
catch ( IOException e ) public synchronized void stopTimers()
{ {
log.error( "Failed to process election timeout.", e ); if ( electionTimer != null )
} {
timeout.renew(); electionTimer.cancel();
} ); }
heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, timeout -> if ( heartbeatTimer != null )
{
heartbeatTimer.cancel();
}
}

private TimeoutHandler renewing( ThrowingAction<Exception> action )
{
return timeout ->
{ {
try try
{ {
handle( new RaftMessages.Timeout.Heartbeat( myself ) ); action.apply();
} }
catch ( IOException e ) catch ( Exception e )
{ {
log.error( "Failed to process heartbeat timeout.", e ); log.error( "Failed to process timeout.", e );
} }
timeout.renew(); timeout.renew();
} ); };
}

private synchronized void electionTimeout() throws IOException
{
if ( clock.millis() - lastElectionRenewalMillis >= electionTimeout )
{
triggerElection();
}
} }


public void triggerElection() throws IOException public void triggerElection() throws IOException
Expand All @@ -150,19 +172,11 @@ public void triggerElection() throws IOException
{ {
handle( new RaftMessages.Timeout.Election( myself ) ); handle( new RaftMessages.Timeout.Election( myself ) );
} }
else
{
log.info(
format( "Election timeout occured, but {%s} is configured to not tirgger an election. " +
"See setting: %s",
myself, CausalClusteringSettings.refuse_to_be_leader.name() ) );
}
} }


public void panic() public void panic()
{ {
heartbeatTimer.cancel(); stopTimers();
electionTimer.cancel();
} }


public synchronized RaftCoreState coreState() public synchronized RaftCoreState coreState()
Expand Down Expand Up @@ -316,7 +330,11 @@ private void handleTimers( Outcome outcome )
{ {
if ( outcome.electionTimeoutRenewed() ) if ( outcome.electionTimeoutRenewed() )
{ {
electionTimer.renew(); lastElectionRenewalMillis = clock.millis();
if ( electionTimer != null )
{
electionTimer.renew();
}
} }
} }


Expand Down
Expand Up @@ -186,6 +186,7 @@ public synchronized void start() throws Throwable
localDatabase.start(); localDatabase.start();
coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() ); coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() );
applicationProcess.start(); applicationProcess.start();
raftMachine.startTimers();
} }


private boolean haveState() private boolean haveState()
Expand All @@ -198,6 +199,7 @@ private boolean haveState()
@Override @Override
public synchronized void stop() throws Throwable public synchronized void stop() throws Throwable
{ {
raftMachine.stopTimers();
applicationProcess.stop(); applicationProcess.stop();
localDatabase.stop(); localDatabase.stop();
allowMessageHandling = false; allowMessageHandling = false;
Expand Down
Expand Up @@ -23,14 +23,13 @@
import org.junit.Test; import org.junit.Test;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;

import java.util.concurrent.Future; import java.util.concurrent.Future;


import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback; import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
import org.neo4j.causalclustering.catchup.CatchupResult; import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService; import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
Expand All @@ -48,7 +47,6 @@
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -139,7 +137,7 @@ public void shouldRenewTxPullTimeoutOnSuccessfulTxPulling() throws Throwable
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );


// then // then
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew(); assertEquals( 1, timeoutService.getTimeout( TX_PULLER_TIMEOUT ).renewalCount() );
} }


@Test @Test
Expand Down Expand Up @@ -199,7 +197,7 @@ public void shouldNotRenewTheTimeoutIfInPanicState() throws Throwable


// then // then
assertEquals( PANIC, txPuller.state() ); assertEquals( PANIC, txPuller.state() );
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), never() ).renew(); assertEquals( 0, timeoutService.getTimeout( TX_PULLER_TIMEOUT ).renewalCount() );
} }


@Test @Test
Expand Down
Expand Up @@ -66,6 +66,7 @@ public class RaftMachineBuilder


private LogProvider logProvider = NullLogProvider.getInstance(); private LogProvider logProvider = NullLogProvider.getInstance();
private Clock clock = Clocks.systemClock(); private Clock clock = Clocks.systemClock();
private Clock shippingClock = Clocks.systemClock();


private long electionTimeout = 500; private long electionTimeout = 500;
private long heartbeatInterval = 150; private long heartbeatInterval = 150;
Expand Down Expand Up @@ -95,11 +96,11 @@ public RaftMachine build()
raftMembership ); raftMembership );
membershipManager.setRecoverFromIndexSupplier( () -> 0 ); membershipManager.setRecoverFromIndexSupplier( () -> 0 );
RaftLogShippingManager logShipping = RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, clock, member, membershipManager, new RaftLogShippingManager( outbound, logProvider, raftLog, shippingClock, member, membershipManager,
retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightMap ); retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightMap );
RaftMachine raft = new RaftMachine( member, termState, voteState, raftLog, electionTimeout, RaftMachine raft = new RaftMachine( member, termState, voteState, raftLog, electionTimeout,
heartbeatInterval, renewableTimeoutService, outbound, logProvider, heartbeatInterval, renewableTimeoutService, outbound, logProvider,
membershipManager, logShipping, inFlightMap, false, monitors ); membershipManager, logShipping, inFlightMap, false, monitors, clock );
inbound.registerHandler( ( incomingMessage ) -> { inbound.registerHandler( ( incomingMessage ) -> {
try try
{ {
Expand Down Expand Up @@ -160,6 +161,12 @@ public RaftMachineBuilder raftLog( RaftLog raftLog )
return this; return this;
} }


public RaftMachineBuilder clock( Clock clock )
{
this.clock = clock;
return this;
}

public RaftMachineBuilder commitListener( CommitListener commitListener ) public RaftMachineBuilder commitListener( CommitListener commitListener )
{ {
this.commitListener = commitListener; this.commitListener = commitListener;
Expand Down

0 comments on commit 6522323

Please sign in to comment.