Skip to content

Commit

Permalink
Renew election timeout on IO threads instead of processing thread
Browse files Browse the repository at this point in the history
This is to avoid timeouts not being refreshed quickly, and elections
triggered, when the processing thread is slow, e.g. blocking for
snapshot download.

LeaderAvailabilityTimers has been extracted to encapsulate timers
for elections and heartbeats as well as timeouts and last refresh
times.
  • Loading branch information
andrewkerr9000 committed Nov 28, 2017
1 parent 71ed837 commit 9ef2eb1
Show file tree
Hide file tree
Showing 13 changed files with 482 additions and 111 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.core.consensus;

import java.io.File;
import java.time.Duration;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule;
Expand All @@ -35,6 +36,7 @@
import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipState;
import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShippingManager;
import org.neo4j.causalclustering.core.consensus.term.MonitoredTermStateStorage;
import org.neo4j.causalclustering.core.consensus.term.TermState;
Expand Down Expand Up @@ -75,6 +77,8 @@ public class ConsensusModule
private final RaftMembershipManager raftMembershipManager;
private final InFlightCache inFlightCache;

private final LeaderAvailabilityTimers leaderAvailabilityTimers;

public ConsensusModule( MemberId myself, final PlatformModule platformModule,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, File clusterStateDirectory,
CoreTopologyService coreTopologyService )
Expand Down Expand Up @@ -112,8 +116,9 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,
new RaftMembershipState.Marshal(),
config.get( CausalClusteringSettings.raft_membership_state_size ), logProvider ) );

long electionTimeout = config.get( CausalClusteringSettings.leader_election_timeout ).toMillis();
long heartbeatInterval = electionTimeout / 3;
raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider );

leaderAvailabilityTimers = createElectionTiming( config, raftTimeoutService, logProvider );

Integer expectedClusterSize = config.get( CausalClusteringSettings.expected_core_cluster_size );

Expand All @@ -122,7 +127,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,
SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound );

raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_timeout ).toMillis(),
expectedClusterSize, leaderAvailabilityTimers.getElectionTimeout(), systemClock(), config.get( join_catch_up_timeout ).toMillis(),
raftMembershipStorage );

life.add( raftMembershipManager );
Expand All @@ -131,23 +136,27 @@ expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_t

RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself,
raftMembershipManager, electionTimeout, config.get( catchup_batch_size ),
raftMembershipManager, leaderAvailabilityTimers.getElectionTimeout(), config.get( catchup_batch_size ),
config.get( log_shipping_max_lag ), inFlightCache );

raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider );

boolean supportsPreVoting = config.get( CausalClusteringSettings.enable_pre_voting );

raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval,
raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightCache,
raftMachine = new RaftMachine( myself, termState, voteState, raftLog, leaderAvailabilityTimers,
outbound, logProvider, raftMembershipManager, logShipping, inFlightCache,
RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ),
supportsPreVoting, platformModule.monitors, systemClock() );
supportsPreVoting, platformModule.monitors );

life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) );

life.add( logShipping );
}

private LeaderAvailabilityTimers createElectionTiming( Config config, RenewableTimeoutService renewableTimeoutService, LogProvider logProvider )
{
Duration electionTimeout = config.get( CausalClusteringSettings.leader_election_timeout );
return new LeaderAvailabilityTimers( electionTimeout, electionTimeout.dividedBy( 3 ), systemClock(), renewableTimeoutService, logProvider );
}

private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem,
File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider,
JobScheduler scheduler )
Expand Down Expand Up @@ -203,4 +212,9 @@ public InFlightCache inFlightCache()
{
return inFlightCache;
}

public LeaderAvailabilityTimers getLeaderAvailabilityTimers()
{
return leaderAvailabilityTimers;
}
}
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus;

import java.util.Objects;
import java.util.function.LongSupplier;

import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class LeaderAvailabilityHandler implements Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage>
{
private final Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> delegateHandler;
private final LeaderAvailabilityTimers leaderAvailabilityTimers;
private final LongSupplier term;
private final Log log;
private volatile ClusterId boundClusterId;

public LeaderAvailabilityHandler( Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> delegateHandler, LeaderAvailabilityTimers leaderAvailabilityTimers,
LongSupplier term, LogProvider logProvider )
{
this.delegateHandler = delegateHandler;
this.leaderAvailabilityTimers = leaderAvailabilityTimers;
this.term = term;
this.log = logProvider.getLog( getClass() );
}

public synchronized void start( ClusterId clusterId )
{
boundClusterId = clusterId;
}

public synchronized void stop()
{
boundClusterId = null;
}

@Override
public void handle( RaftMessages.ClusterIdAwareMessage message )
{
if ( Objects.isNull( boundClusterId ) )
{
log.debug( "This pre handler has been stopped, dropping the message: %s", message.message() );
}
else if ( !Objects.equals( message.clusterId(), boundClusterId ) )
{
log.info( "Discarding message[%s] owing to mismatched clusterId. Expected: %s, Encountered: %s",
message.message(), boundClusterId, message.clusterId() );
}
else
{
handleTimeouts( message );

delegateHandler.handle( message );
}
}

private void handleTimeouts( RaftMessages.ClusterIdAwareMessage message )
{
if ( shouldRenewElectionTimeout( message.message() ) )
{
leaderAvailabilityTimers.renewElection();
}
}

// TODO replace with visitor pattern
private boolean shouldRenewElectionTimeout( RaftMessages.RaftMessage message )
{
switch ( message.type() )
{
case HEARTBEAT:
RaftMessages.Heartbeat heartbeat = (RaftMessages.Heartbeat) message;
return heartbeat.leaderTerm() >= term.getAsLong();
case APPEND_ENTRIES_REQUEST:
RaftMessages.AppendEntries.Request request = (RaftMessages.AppendEntries.Request) message;
return request.leaderTerm() >= term.getAsLong();
default:
return false;
}
}
}
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus;

import java.time.Clock;
import java.time.Duration;

import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.function.ThrowingAction;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

class LeaderAvailabilityTimers
{
private final long electionTimeout;
private final long heartbeatInterval;
private final Clock clock;
private final RenewableTimeoutService renewableTimeoutService;
private final Log log;

private volatile long lastElectionRenewalMillis;

private RenewableTimeoutService.RenewableTimeout heartbeatTimer;
private RenewableTimeoutService.RenewableTimeout electionTimer;

LeaderAvailabilityTimers( Duration electionTimeout, Duration heartbeatInterval, Clock clock, RenewableTimeoutService renewableTimeoutService,
LogProvider logProvider )
{
this.electionTimeout = electionTimeout.toMillis();
this.heartbeatInterval = heartbeatInterval.toMillis();
this.clock = clock;
this.renewableTimeoutService = renewableTimeoutService;
this.log = logProvider.getLog( getClass() );

if ( this.electionTimeout < this.heartbeatInterval )
{
throw new IllegalArgumentException( String.format(
"Election timeout %s should not be shorter than heartbeat interval %s", this.electionTimeout, this.heartbeatInterval
) );
}
}

synchronized void start( ThrowingAction<Exception> electionAction, ThrowingAction<Exception> heartbeatAction )
{
this.electionTimer = renewableTimeoutService.create( RaftMachine.Timeouts.ELECTION, getElectionTimeout(), randomTimeoutRange(),
renewing( electionAction ) );
this.heartbeatTimer = renewableTimeoutService.create( RaftMachine.Timeouts.HEARTBEAT, getHeartbeatInterval(), 0,
renewing( heartbeatAction ) );
lastElectionRenewalMillis = clock.millis();
}

synchronized void stop()
{
if ( electionTimer != null )
{
electionTimer.cancel();
}
if ( heartbeatTimer != null )
{
heartbeatTimer.cancel();
}

}

synchronized void renewElection()
{
lastElectionRenewalMillis = clock.millis();
if ( electionTimer != null )
{
electionTimer.renew();
}
}

synchronized boolean isElectionTimedOut()
{
return clock.millis() - lastElectionRenewalMillis >= electionTimeout;
}

// Getters for immutable values
long getElectionTimeout()
{
return electionTimeout;
}

long getHeartbeatInterval()
{
return heartbeatInterval;
}

private long randomTimeoutRange()
{
return getElectionTimeout();
}

private RenewableTimeoutService.TimeoutHandler renewing( ThrowingAction<Exception> action )
{
return timeout ->
{
try
{
action.apply();
}
catch ( Exception e )
{
log.error( "Failed to process timeout.", e );
}
timeout.renew();
};
}
}

0 comments on commit 9ef2eb1

Please sign in to comment.