From 9ef2eb132e9e6f6ca7f5978d976f2362f5463da1 Mon Sep 17 00:00:00 2001 From: Andrew Kerr Date: Wed, 15 Nov 2017 10:23:20 +0000 Subject: [PATCH] Renew election timeout on IO threads instead of processing thread This is to avoid timeouts not being refreshed quickly, and elections triggered, when the processing thread is slow, e.g. blocking for snapshot download. LeaderAvailabilityTimers has been extracted to encapsulate timers for elections and heartbeats as well as timeouts and last refresh times. --- .../core/consensus/ConsensusModule.java | 32 +++- .../consensus/LeaderAvailabilityHandler.java | 100 ++++++++++ .../consensus/LeaderAvailabilityTimers.java | 127 +++++++++++++ .../core/consensus/RaftMachine.java | 68 +------ .../core/consensus/roles/Appending.java | 1 - .../core/consensus/roles/Heart.java | 1 - .../core/consensus/roles/Voting.java | 2 +- .../core/consensus/term/TermState.java | 2 +- .../core/server/CoreServerModule.java | 28 +-- .../causalclustering/core/state/CoreLife.java | 11 +- .../LeaderAvailabilityHandlerTest.java | 177 ++++++++++++++++++ .../core/consensus/RaftMachineBuilder.java | 26 ++- .../core/consensus/roles/FollowerTest.java | 18 -- 13 files changed, 482 insertions(+), 111 deletions(-) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandler.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityTimers.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandlerTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java index 116340b63b592..90cd20c647e7b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.core.consensus; import java.io.File; +import java.time.Duration; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule; @@ -35,6 +36,7 @@ import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager; import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipState; import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService; +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShippingManager; import org.neo4j.causalclustering.core.consensus.term.MonitoredTermStateStorage; import org.neo4j.causalclustering.core.consensus.term.TermState; @@ -75,6 +77,8 @@ public class ConsensusModule private final RaftMembershipManager raftMembershipManager; private final InFlightCache inFlightCache; + private final LeaderAvailabilityTimers leaderAvailabilityTimers; + public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound outbound, File clusterStateDirectory, CoreTopologyService coreTopologyService ) @@ -112,8 +116,9 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, new RaftMembershipState.Marshal(), config.get( CausalClusteringSettings.raft_membership_state_size ), logProvider ) ); - long electionTimeout = config.get( CausalClusteringSettings.leader_election_timeout ).toMillis(); - long heartbeatInterval = electionTimeout / 3; + raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider ); + + leaderAvailabilityTimers = createElectionTiming( config, raftTimeoutService, logProvider ); Integer expectedClusterSize = config.get( CausalClusteringSettings.expected_core_cluster_size ); @@ -122,7 +127,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound ); raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, - expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_timeout ).toMillis(), + expectedClusterSize, leaderAvailabilityTimers.getElectionTimeout(), systemClock(), config.get( join_catch_up_timeout ).toMillis(), raftMembershipStorage ); life.add( raftMembershipManager ); @@ -131,23 +136,27 @@ expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_t RaftLogShippingManager logShipping = new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself, - raftMembershipManager, electionTimeout, config.get( catchup_batch_size ), + raftMembershipManager, leaderAvailabilityTimers.getElectionTimeout(), config.get( catchup_batch_size ), config.get( log_shipping_max_lag ), inFlightCache ); - raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider ); - boolean supportsPreVoting = config.get( CausalClusteringSettings.enable_pre_voting ); - raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval, - raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightCache, + raftMachine = new RaftMachine( myself, termState, voteState, raftLog, leaderAvailabilityTimers, + outbound, logProvider, raftMembershipManager, logShipping, inFlightCache, RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ), - supportsPreVoting, platformModule.monitors, systemClock() ); + supportsPreVoting, platformModule.monitors ); life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) ); life.add( logShipping ); } + private LeaderAvailabilityTimers createElectionTiming( Config config, RenewableTimeoutService renewableTimeoutService, LogProvider logProvider ) + { + Duration electionTimeout = config.get( CausalClusteringSettings.leader_election_timeout ); + return new LeaderAvailabilityTimers( electionTimeout, electionTimeout.dividedBy( 3 ), systemClock(), renewableTimeoutService, logProvider ); + } + private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem, File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider, JobScheduler scheduler ) @@ -203,4 +212,9 @@ public InFlightCache inFlightCache() { return inFlightCache; } + + public LeaderAvailabilityTimers getLeaderAvailabilityTimers() + { + return leaderAvailabilityTimers; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandler.java new file mode 100644 index 0000000000000..c613319f7f960 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandler.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.consensus; + +import java.util.Objects; +import java.util.function.LongSupplier; + +import org.neo4j.causalclustering.identity.ClusterId; +import org.neo4j.causalclustering.messaging.Inbound; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +public class LeaderAvailabilityHandler implements Inbound.MessageHandler +{ + private final Inbound.MessageHandler delegateHandler; + private final LeaderAvailabilityTimers leaderAvailabilityTimers; + private final LongSupplier term; + private final Log log; + private volatile ClusterId boundClusterId; + + public LeaderAvailabilityHandler( Inbound.MessageHandler delegateHandler, LeaderAvailabilityTimers leaderAvailabilityTimers, + LongSupplier term, LogProvider logProvider ) + { + this.delegateHandler = delegateHandler; + this.leaderAvailabilityTimers = leaderAvailabilityTimers; + this.term = term; + this.log = logProvider.getLog( getClass() ); + } + + public synchronized void start( ClusterId clusterId ) + { + boundClusterId = clusterId; + } + + public synchronized void stop() + { + boundClusterId = null; + } + + @Override + public void handle( RaftMessages.ClusterIdAwareMessage message ) + { + if ( Objects.isNull( boundClusterId ) ) + { + log.debug( "This pre handler has been stopped, dropping the message: %s", message.message() ); + } + else if ( !Objects.equals( message.clusterId(), boundClusterId ) ) + { + log.info( "Discarding message[%s] owing to mismatched clusterId. Expected: %s, Encountered: %s", + message.message(), boundClusterId, message.clusterId() ); + } + else + { + handleTimeouts( message ); + + delegateHandler.handle( message ); + } + } + + private void handleTimeouts( RaftMessages.ClusterIdAwareMessage message ) + { + if ( shouldRenewElectionTimeout( message.message() ) ) + { + leaderAvailabilityTimers.renewElection(); + } + } + + // TODO replace with visitor pattern + private boolean shouldRenewElectionTimeout( RaftMessages.RaftMessage message ) + { + switch ( message.type() ) + { + case HEARTBEAT: + RaftMessages.Heartbeat heartbeat = (RaftMessages.Heartbeat) message; + return heartbeat.leaderTerm() >= term.getAsLong(); + case APPEND_ENTRIES_REQUEST: + RaftMessages.AppendEntries.Request request = (RaftMessages.AppendEntries.Request) message; + return request.leaderTerm() >= term.getAsLong(); + default: + return false; + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityTimers.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityTimers.java new file mode 100644 index 0000000000000..64f3ca1e27327 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityTimers.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.consensus; + +import java.time.Clock; +import java.time.Duration; + +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; +import org.neo4j.function.ThrowingAction; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +class LeaderAvailabilityTimers +{ + private final long electionTimeout; + private final long heartbeatInterval; + private final Clock clock; + private final RenewableTimeoutService renewableTimeoutService; + private final Log log; + + private volatile long lastElectionRenewalMillis; + + private RenewableTimeoutService.RenewableTimeout heartbeatTimer; + private RenewableTimeoutService.RenewableTimeout electionTimer; + + LeaderAvailabilityTimers( Duration electionTimeout, Duration heartbeatInterval, Clock clock, RenewableTimeoutService renewableTimeoutService, + LogProvider logProvider ) + { + this.electionTimeout = electionTimeout.toMillis(); + this.heartbeatInterval = heartbeatInterval.toMillis(); + this.clock = clock; + this.renewableTimeoutService = renewableTimeoutService; + this.log = logProvider.getLog( getClass() ); + + if ( this.electionTimeout < this.heartbeatInterval ) + { + throw new IllegalArgumentException( String.format( + "Election timeout %s should not be shorter than heartbeat interval %s", this.electionTimeout, this.heartbeatInterval + ) ); + } + } + + synchronized void start( ThrowingAction electionAction, ThrowingAction heartbeatAction ) + { + this.electionTimer = renewableTimeoutService.create( RaftMachine.Timeouts.ELECTION, getElectionTimeout(), randomTimeoutRange(), + renewing( electionAction ) ); + this.heartbeatTimer = renewableTimeoutService.create( RaftMachine.Timeouts.HEARTBEAT, getHeartbeatInterval(), 0, + renewing( heartbeatAction ) ); + lastElectionRenewalMillis = clock.millis(); + } + + synchronized void stop() + { + if ( electionTimer != null ) + { + electionTimer.cancel(); + } + if ( heartbeatTimer != null ) + { + heartbeatTimer.cancel(); + } + + } + + synchronized void renewElection() + { + lastElectionRenewalMillis = clock.millis(); + if ( electionTimer != null ) + { + electionTimer.renew(); + } + } + + synchronized boolean isElectionTimedOut() + { + return clock.millis() - lastElectionRenewalMillis >= electionTimeout; + } + + // Getters for immutable values + long getElectionTimeout() + { + return electionTimeout; + } + + long getHeartbeatInterval() + { + return heartbeatInterval; + } + + private long randomTimeoutRange() + { + return getElectionTimeout(); + } + + private RenewableTimeoutService.TimeoutHandler renewing( ThrowingAction action ) + { + return timeout -> + { + try + { + action.apply(); + } + catch ( Exception e ) + { + log.error( "Failed to process timeout.", e ); + } + timeout.renew(); + }; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java index 4682e33450f60..2e1d0eaabf580 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.core.consensus; import java.io.IOException; -import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Objects; @@ -35,7 +34,6 @@ import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; -import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutHandler; import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShippingManager; import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; import org.neo4j.causalclustering.core.consensus.state.RaftState; @@ -46,7 +44,6 @@ import org.neo4j.causalclustering.helper.VolatileFuture; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.Outbound; -import org.neo4j.function.ThrowingAction; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; @@ -63,7 +60,6 @@ public class RaftMachine implements LeaderLocator, CoreMetaData { private final LeaderNotFoundMonitor leaderNotFoundMonitor; - private RenewableTimeoutService.RenewableTimeout heartbeatTimer; private InFlightCache inFlightCache; public enum Timeouts implements RenewableTimeoutService.TimeoutName @@ -75,15 +71,9 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private final RaftState state; private final MemberId myself; - private final RenewableTimeoutService renewableTimeoutService; - private final long heartbeatInterval; - private RenewableTimeoutService.RenewableTimeout electionTimer; + private final LeaderAvailabilityTimers leaderAvailabilityTimers; private RaftMembershipManager membershipManager; private final boolean refuseToBecomeLeader; - private final Clock clock; - - private final long electionTimeout; - private long lastElectionRenewalMillis; private final VolatileFuture volatileLeader = new VolatileFuture<>( null ); @@ -94,17 +84,12 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private RaftLogShippingManager logShipping; public RaftMachine( MemberId myself, StateStorage termStorage, StateStorage voteStorage, - RaftLog entryLog, long electionTimeout, long heartbeatInterval, - RenewableTimeoutService renewableTimeoutService, Outbound outbound, + RaftLog entryLog, LeaderAvailabilityTimers leaderAvailabilityTimers, Outbound outbound, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, - InFlightCache inFlightCache, boolean refuseToBecomeLeader, boolean supportPreVoting, Monitors monitors, - Clock clock ) + InFlightCache inFlightCache, boolean refuseToBecomeLeader, boolean supportPreVoting,Monitors monitors ) { this.myself = myself; - this.electionTimeout = electionTimeout; - this.heartbeatInterval = heartbeatInterval; - - this.renewableTimeoutService = renewableTimeoutService; + this.leaderAvailabilityTimers = leaderAvailabilityTimers; this.outbound = outbound; this.logShipping = logShipping; @@ -112,7 +97,6 @@ public RaftMachine( MemberId myself, StateStorage termStorage, StateS this.membershipManager = membershipManager; this.refuseToBecomeLeader = refuseToBecomeLeader; - this.clock = clock; this.inFlightCache = inFlightCache; this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightCache, @@ -130,11 +114,7 @@ public synchronized void postRecoveryActions() { if ( !refuseToBecomeLeader ) { - lastElectionRenewalMillis = clock.millis(); - electionTimer = renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), - renewing( this::electionTimeout ) ); - heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, - renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) ); + leaderAvailabilityTimers.start( this::electionTimeout, () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ); } inFlightCache.enable(); @@ -142,35 +122,12 @@ public synchronized void postRecoveryActions() public synchronized void stopTimers() { - if ( electionTimer != null ) - { - electionTimer.cancel(); - } - if ( heartbeatTimer != null ) - { - heartbeatTimer.cancel(); - } - } - - private TimeoutHandler renewing( ThrowingAction action ) - { - return timeout -> - { - try - { - action.apply(); - } - catch ( Exception e ) - { - log.error( "Failed to process timeout.", e ); - } - timeout.renew(); - }; + leaderAvailabilityTimers.stop(); } private synchronized void electionTimeout() throws IOException { - if ( clock.millis() - lastElectionRenewalMillis >= electionTimeout ) + if ( leaderAvailabilityTimers.isElectionTimedOut() ) { triggerElection(); } @@ -340,11 +297,7 @@ private void handleTimers( Outcome outcome ) { if ( outcome.electionTimeoutRenewed() ) { - lastElectionRenewalMillis = clock.millis(); - if ( electionTimer != null ) - { - electionTimer.renew(); - } + leaderAvailabilityTimers.renewElection(); } } @@ -403,11 +356,6 @@ public long term() return state.term(); } - private long randomTimeoutRange() - { - return electionTimeout; - } - public Set votingMembers() { return membershipManager.votingMembers(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java index cb416d3d55492..403ffb52a8287 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java @@ -49,7 +49,6 @@ static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome return; } - outcome.renewElectionTimeout(); outcome.setPreElection( false ); outcome.setNextTerm( request.leaderTerm() ); outcome.setLeader( request.from() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java index b861273972ee9..1157e31bd20f9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java @@ -36,7 +36,6 @@ static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbe return; } - outcome.renewElectionTimeout(); outcome.setPreElection( false ); outcome.setNextTerm( request.leaderTerm() ); outcome.setLeader( request.from() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java index ed0799f6d7bf8..c691bfb6508c8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java @@ -30,7 +30,7 @@ public class Voting { - static void handleVoteRequest( ReadableRaftState state, Outcome outcome, + static void handleVoteRequest( ReadableRaftState state, Outcome outcome, RaftMessages.Vote.Request voteRequest, Log log ) throws IOException { if ( voteRequest.term() > state.term() ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/term/TermState.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/term/TermState.java index bd5e2048f40e0..c8e439247b406 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/term/TermState.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/term/TermState.java @@ -27,7 +27,7 @@ public class TermState { - private long term = 0; + private volatile long term = 0; public TermState() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index f2acb95ae1adf..aa39967d01850 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -40,6 +40,7 @@ import org.neo4j.causalclustering.core.consensus.ConsensusModule; import org.neo4j.causalclustering.core.consensus.ContinuousJob; import org.neo4j.causalclustering.core.consensus.RaftMessages; +import org.neo4j.causalclustering.core.consensus.LeaderAvailabilityHandler; import org.neo4j.causalclustering.core.consensus.RaftServer; import org.neo4j.causalclustering.core.consensus.log.pruning.PruningScheduler; import org.neo4j.causalclustering.core.consensus.membership.MembershipWaiter; @@ -181,22 +182,21 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data RaftMessageHandler messageHandler = new RaftMessageHandler( localDatabase, logProvider, consensusModule.raftMachine(), downloader, commandApplicationProcess ); - CoreLife coreLife = new CoreLife( consensusModule.raftMachine(), localDatabase, - clusteringModule.clusterBinder(), commandApplicationProcess, - coreStateMachinesModule.coreStateMachines, messageHandler, snapshotService ); - - RaftLogPruner raftLogPruner = new RaftLogPruner( consensusModule.raftMachine(), commandApplicationProcess ); - dependencies.satisfyDependency( raftLogPruner ); - - life.add( new PruningScheduler( raftLogPruner, jobScheduler, - config.get( CausalClusteringSettings.raft_log_pruning_frequency ).toMillis(), logProvider ) ); - int queueSize = config.get( CausalClusteringSettings.raft_in_queue_size ); int maxBatch = config.get( CausalClusteringSettings.raft_in_queue_max_batch ); BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler( messageHandler, queueSize, maxBatch, logProvider ); + LeaderAvailabilityHandler leaderAvailabilityHandler = new LeaderAvailabilityHandler( batchingMessageHandler, consensusModule.getLeaderAvailabilityTimers(), + consensusModule.raftMachine()::term, logProvider ); + + CoreLife coreLife = new CoreLife( consensusModule.raftMachine(), localDatabase, + clusteringModule.clusterBinder(), commandApplicationProcess, + coreStateMachinesModule.coreStateMachines, messageHandler, leaderAvailabilityHandler, snapshotService ); + + loggingRaftInbound.registerHandler( leaderAvailabilityHandler ); + long electionTimeout = config.get( CausalClusteringSettings.leader_election_timeout ).toMillis(); MembershipWaiter membershipWaiter = new MembershipWaiter( identityModule.myself(), jobScheduler, @@ -205,8 +205,6 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, consensusModule.raftMachine(), logProvider ); - loggingRaftInbound.registerHandler( batchingMessageHandler ); - CatchupServer catchupServer = new CatchupServer( logProvider, userLogProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), @@ -214,6 +212,12 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data new CheckpointerSupplier( platformModule.dependencies ), fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, sslPolicy ); + RaftLogPruner raftLogPruner = new RaftLogPruner( consensusModule.raftMachine(), commandApplicationProcess ); + dependencies.satisfyDependency( raftLogPruner ); + + life.add( new PruningScheduler( raftLogPruner, jobScheduler, + config.get( CausalClusteringSettings.raft_log_pruning_frequency ).toMillis(), logProvider ) ); + // Exposes this so that tests can start/stop the catchup server dependencies.satisfyDependency( catchupServer ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java index 5ff8c491c1a38..d8f3b201978cd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/CoreLife.java @@ -21,6 +21,7 @@ import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.consensus.RaftMachine; +import org.neo4j.causalclustering.core.consensus.LeaderAvailabilityHandler; import org.neo4j.causalclustering.core.state.machines.CoreStateMachines; import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshot; import org.neo4j.causalclustering.identity.BoundState; @@ -36,7 +37,8 @@ public class CoreLife implements Lifecycle private final CommandApplicationProcess applicationProcess; private final CoreStateMachines coreStateMachines; private final RaftMessageHandler raftMessageHandler; - private CoreSnapshotService snapshotService; + private final LeaderAvailabilityHandler leaderAvailabilityHandler; + private final CoreSnapshotService snapshotService; public CoreLife( RaftMachine raftMachine, @@ -44,7 +46,9 @@ public CoreLife( ClusterBinder clusterBinder, CommandApplicationProcess commandApplicationProcess, CoreStateMachines coreStateMachines, - RaftMessageHandler raftMessageHandler, CoreSnapshotService snapshotService ) + RaftMessageHandler raftMessageHandler, + LeaderAvailabilityHandler leaderAvailabilityHandler, + CoreSnapshotService snapshotService ) { this.raftMachine = raftMachine; this.localDatabase = localDatabase; @@ -52,6 +56,7 @@ public CoreLife( this.applicationProcess = commandApplicationProcess; this.coreStateMachines = coreStateMachines; this.raftMessageHandler = raftMessageHandler; + this.leaderAvailabilityHandler = leaderAvailabilityHandler; this.snapshotService = snapshotService; } @@ -66,6 +71,7 @@ public synchronized void start() throws Throwable { BoundState boundState = clusterBinder.bindToCluster(); raftMessageHandler.start( boundState.clusterId() ); + leaderAvailabilityHandler.start( boundState.clusterId() ); if ( boundState.snapshot().isPresent() ) { @@ -89,6 +95,7 @@ public synchronized void stop() throws Throwable { raftMachine.stopTimers(); raftMessageHandler.stop(); + leaderAvailabilityHandler.stop(); applicationProcess.stop(); localDatabase.stop(); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandlerTest.java new file mode 100644 index 0000000000000..c7e69aabe8687 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/LeaderAvailabilityHandlerTest.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.consensus; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.UUID; +import java.util.function.LongSupplier; + +import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; +import org.neo4j.causalclustering.identity.ClusterId; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.messaging.Inbound; +import org.neo4j.logging.NullLogProvider; + +public class LeaderAvailabilityHandlerTest +{ + @SuppressWarnings( "unchecked" ) + private Inbound.MessageHandler delegate = Mockito.mock( Inbound.MessageHandler.class ); + private LeaderAvailabilityTimers leaderAvailabilityTimers = Mockito.mock( LeaderAvailabilityTimers.class ); + private ClusterId clusterId = new ClusterId( UUID.randomUUID() ); + private LongSupplier term = () -> 3; + + private LeaderAvailabilityHandler handler = new LeaderAvailabilityHandler( delegate, leaderAvailabilityTimers, term, NullLogProvider.getInstance() ); + + private MemberId leader = new MemberId( UUID.randomUUID() ); + private RaftMessages.ClusterIdAwareMessage heartbeat = + new RaftMessages.ClusterIdAwareMessage( clusterId, new RaftMessages.Heartbeat( leader, term.getAsLong(), 0, 0 ) ); + private RaftMessages.ClusterIdAwareMessage appendEntries = + new RaftMessages.ClusterIdAwareMessage( clusterId, + new RaftMessages.AppendEntries.Request( leader, term.getAsLong(), 0, 0, RaftLogEntry.empty, 0 ) + ); + private RaftMessages.ClusterIdAwareMessage voteResponse = + new RaftMessages.ClusterIdAwareMessage( clusterId, new RaftMessages.Vote.Response( leader, term.getAsLong(), false ) ); + + @Test + public void shouldDropMessagesIfHasNotBeenStarted() throws Exception + { + // when + handler.handle( heartbeat ); + + // then + Mockito.verify( delegate, Mockito.never() ).handle( heartbeat ); + } + + @Test + public void shouldDropMessagesIfHasBeenStopped() throws Exception + { + // given + handler.start( clusterId ); + handler.stop(); + + // when + handler.handle( heartbeat ); + + // then + Mockito.verify( delegate, Mockito.never() ).handle( heartbeat ); + } + + @Test + public void shouldDropMessagesIfForDifferentClusterId() throws Exception + { + // given + handler.start( clusterId ); + + // when + handler.handle( new RaftMessages.ClusterIdAwareMessage( + new ClusterId( UUID.randomUUID() ), new RaftMessages.Heartbeat( leader, term.getAsLong(), 0, 0 ) + ) ); + + // then + Mockito.verify( delegate, Mockito.never() ).handle( heartbeat ); + } + + @Test + public void shouldDelegateMessages() throws Exception + { + // given + handler.start( clusterId ); + + // when + handler.handle( heartbeat ); + + // then + Mockito.verify( delegate ).handle( heartbeat ); + } + + @Test + public void shouldRenewElectionForHeartbeats() throws Exception + { + // given + handler.start( clusterId ); + + // when + handler.handle( heartbeat ); + + // then + Mockito.verify( leaderAvailabilityTimers ).renewElection(); + } + + @Test + public void shouldRenewElectionForAppendEntriesRequests() throws Exception + { + // given + handler.start( clusterId ); + + // when + handler.handle( appendEntries ); + + // then + Mockito.verify( leaderAvailabilityTimers ).renewElection(); + } + + @Test + public void shouldNotRenewElectionForOtherMessages() throws Exception + { + // given + handler.start( clusterId ); + + // when + handler.handle( voteResponse ); + + // then + Mockito.verify( leaderAvailabilityTimers, Mockito.never() ).renewElection(); + } + + @Test + public void shouldNotRenewElectionTimeoutsForHeartbeatsFromEarlierTerm() throws Exception + { + // given + RaftMessages.ClusterIdAwareMessage heartbeat = + new RaftMessages.ClusterIdAwareMessage( clusterId, new RaftMessages.Heartbeat( leader, term.getAsLong() - 1, 0, 0 ) ); + + handler.start( clusterId ); + + // when + handler.handle( heartbeat ); + + // then + Mockito.verify( leaderAvailabilityTimers, Mockito.never() ).renewElection(); + } + + @Test + public void shouldNotRenewElectionTimeoutsForAppendEntriesRequestsFromEarlierTerms() throws Exception + { + RaftMessages.ClusterIdAwareMessage appendEntries = + new RaftMessages.ClusterIdAwareMessage( clusterId, + new RaftMessages.AppendEntries.Request( leader, term.getAsLong() - 1, 0, 0, RaftLogEntry.empty, 0 ) + ); + + handler.start( clusterId ); + + // when + handler.handle( appendEntries ); + + // then + Mockito.verify( leaderAvailabilityTimers, Mockito.never() ).renewElection(); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java index 603ea38ed7f9d..2d781635246f7 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Clock; +import java.time.Duration; import org.neo4j.causalclustering.core.consensus.log.cache.ConsecutiveInFlightCache; import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache; @@ -55,8 +56,9 @@ public class RaftMachineBuilder private int expectedClusterSize; private RaftGroup.Builder memberSetBuilder; - private StateStorage termState = new InMemoryStateStorage<>( new TermState() ); - private StateStorage voteState = new InMemoryStateStorage<>( new VoteState() ); + private TermState termState = new TermState(); + private StateStorage termStateStorage = new InMemoryStateStorage<>( termState ); + private StateStorage voteStateStorage = new InMemoryStateStorage<>( new VoteState() ); private RaftLog raftLog = new InMemoryRaftLog(); private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clocks.systemClock(), getInstance() ); @@ -68,8 +70,11 @@ public class RaftMachineBuilder private Clock clock = Clocks.systemClock(); private Clock shippingClock = Clocks.systemClock(); + private long term = termState.currentTerm(); + private long electionTimeout = 500; private long heartbeatInterval = 150; + private long catchupTimeout = 30000; private long retryTimeMillis = electionTimeout / 2; private int catchupBatchSize = 64; @@ -89,17 +94,20 @@ public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.B public RaftMachine build() { + termState.update( term ); + LeaderAvailabilityTimers + leaderAvailabilityTimers = new LeaderAvailabilityTimers( Duration.ofMillis( electionTimeout ), Duration.ofMillis( heartbeatInterval ), clock, + renewableTimeoutService, logProvider ); SendToMyself leaderOnlyReplicator = new SendToMyself( member, outbound ); RaftMembershipManager membershipManager = new RaftMembershipManager( leaderOnlyReplicator, - memberSetBuilder, raftLog, logProvider, expectedClusterSize, electionTimeout, clock, catchupTimeout, + memberSetBuilder, raftLog, logProvider, expectedClusterSize, leaderAvailabilityTimers.getElectionTimeout(), clock, catchupTimeout, raftMembership ); membershipManager.setRecoverFromIndexSupplier( () -> 0 ); RaftLogShippingManager logShipping = new RaftLogShippingManager( outbound, logProvider, raftLog, shippingClock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightCache ); - RaftMachine raft = new RaftMachine( member, termState, voteState, raftLog, electionTimeout, - heartbeatInterval, renewableTimeoutService, outbound, logProvider, - membershipManager, logShipping, inFlightCache, false, false, monitors, clock ); + RaftMachine raft = new RaftMachine( member, termStateStorage, voteStateStorage, raftLog, leaderAvailabilityTimers, outbound, logProvider, + membershipManager, logShipping, inFlightCache, false, false, monitors ); inbound.registerHandler( ( incomingMessage ) -> { try @@ -185,6 +193,12 @@ RaftMachineBuilder monitors( Monitors monitors ) return this; } + public RaftMachineBuilder term( long term ) + { + this.term = term; + return this; + } + public interface CommitListener { /** diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java index 568dae347f3b4..2dd93aa8220d2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java @@ -233,24 +233,6 @@ public void shouldUpdateCommitIndexIfNecessary() throws Exception assertEquals( 3, state.commitIndex() ); } - @Test - public void shouldRenewElectionTimeoutOnReceiptOfHeartbeatInCurrentOrHigherTerm() throws Exception - { - // given - RaftState state = raftState() - .myself( myself ) - .term(0) - .build(); - - Follower follower = new Follower(); - - Outcome outcome = follower.handle( new RaftMessages.Heartbeat( myself, 1, 1, 1 ), - state, log() ); - - // then - assertTrue( outcome.electionTimeoutRenewed() ); - } - @Test public void shouldNotRenewElectionTimeoutOnReceiptOfHeartbeatInLowerTerm() throws Exception {