Skip to content

Commit

Permalink
Core members can refused to become leaders
Browse files Browse the repository at this point in the history
Core members can refuse to trigger an election (and thus become leaders).
This is useful for those (typically multi-DC) scenarios where end-users want to restrict
the servers which can host the leader role. This is, clearly, at the deliberate expense of
availability.
  • Loading branch information
jimwebber committed Feb 24, 2017
1 parent 9cbf44e commit 2b1fb2a
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 190 deletions.
Expand Up @@ -27,12 +27,12 @@
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Internal;
import org.neo4j.kernel.configuration.Settings;

import static org.neo4j.kernel.configuration.Settings.ADVERTISED_SOCKET_ADDRESS;
import static org.neo4j.kernel.configuration.Settings.BOOLEAN;
import static org.neo4j.kernel.configuration.Settings.BYTES;
import static org.neo4j.kernel.configuration.Settings.DURATION;
import static org.neo4j.kernel.configuration.Settings.FALSE;
import static org.neo4j.kernel.configuration.Settings.INTEGER;
import static org.neo4j.kernel.configuration.Settings.NO_DEFAULT;
import static org.neo4j.kernel.configuration.Settings.STRING;
Expand All @@ -54,6 +54,12 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<Long> leader_election_timeout =
setting( "causal_clustering.leader_election_timeout", DURATION, "7s" );

@Description( "Prevents the current instance from volunteering to become Raft leader. Defaults to false, and " +
"should only be used in exceptional circumstances by expert users. Using this can result in reduced " +
"availability for the cluster." )
public static final Setting<Boolean> refuse_to_be_leader =
setting( "causal_clustering.refuse_to_be_leader", BOOLEAN, FALSE );

@Description( "The maximum batch size when catching up (in unit of entries)" )
public static final Setting<Integer> catchup_batch_size =
setting( "causal_clustering.catchup_batch_size", INTEGER, "64" );
Expand Down Expand Up @@ -213,7 +219,7 @@ public class CausalClusteringSettings implements LoadableConfig
"endpoints or return only read replicas. If there are no read replicas in the cluster, followers are " +
"returned as read end points regardless the value of this setting." )
public static final Setting<Boolean> cluster_allow_reads_on_followers =
setting( "causal_clustering.cluster_allow_reads_on_followers", BOOLEAN, Settings.FALSE );
setting( "causal_clustering.cluster_allow_reads_on_followers", BOOLEAN, FALSE );

@Description( "The size of the ID allocation requests Core servers will make when they run out of NODE IDs. " +
"Larger values mean less frequent requests but also result in more unused IDs (and unused disk space) " +
Expand Down Expand Up @@ -323,8 +329,8 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );

@Description( "The configuration must be valid for the configured plugin and usually exists" +
"under matching subkeys, e.g. ..config.server_policies.*" +
"This is just a top-level placeholder for the plugin-specific configuration." )
"under matching subkeys, e.g. ..config.server_policies.*" +
"This is just a top-level placeholder for the plugin-specific configuration." )
public static Setting<String> load_balancing_config =
setting( "causal_clustering.load_balancing.config", STRING, "" );

Expand Down
Expand Up @@ -74,8 +74,9 @@ public class ConsensusModule
private final RaftMembershipManager raftMembershipManager;
private final InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();

public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
File clusterStateDirectory, CoreTopologyService coreTopologyService )
public ConsensusModule( MemberId myself, final PlatformModule platformModule,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, File clusterStateDirectory,
CoreTopologyService coreTopologyService )
{
final Config config = platformModule.config;
final LogService logging = platformModule.logging;
Expand All @@ -86,7 +87,8 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ou

final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal();

RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider, platformModule.jobScheduler );
RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider,
platformModule.jobScheduler );

raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors );

Expand All @@ -95,20 +97,19 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ou
StateStorage<RaftMembershipState> raftMembershipStorage;

StateStorage<TermState> durableTermState = life.add(
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_TERM_NAME,
new TermState.Marshal(), config.get( CausalClusteringSettings.term_state_size ), logProvider ) );
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_TERM_NAME, new TermState.Marshal(),
config.get( CausalClusteringSettings.term_state_size ), logProvider ) );

termState = new MonitoredTermStateStorage( durableTermState, platformModule.monitors );

voteState = life.add(
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_VOTE_NAME,
new VoteState.Marshal( new MemberId.Marshal() ),
config.get( CausalClusteringSettings.vote_state_size ), logProvider ) );
voteState = life.add( new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_VOTE_NAME,
new VoteState.Marshal( new MemberId.Marshal() ), config.get( CausalClusteringSettings.vote_state_size ),
logProvider ) );

raftMembershipStorage = life.add(
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_MEMBERSHIP_NAME,
new RaftMembershipState.Marshal(), config.get( CausalClusteringSettings.raft_membership_state_size ),
logProvider ) );
new RaftMembershipState.Marshal(),
config.get( CausalClusteringSettings.raft_membership_state_size ), logProvider ) );

long electionTimeout = config.get( CausalClusteringSettings.leader_election_timeout );
long heartbeatInterval = electionTimeout / 3;
Expand All @@ -120,55 +121,55 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ou
SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound );

raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout, systemClock(),
config.get( join_catch_up_timeout ), raftMembershipStorage
);
expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_timeout ),
raftMembershipStorage );

life.add( raftMembershipManager );

RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(),
myself, raftMembershipManager, electionTimeout,
config.get( catchup_batch_size ),
new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself,
raftMembershipManager, electionTimeout, config.get( catchup_batch_size ),
config.get( log_shipping_max_lag ), inFlightMap );

raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider );

raftMachine =
new RaftMachine( myself, termState, voteState, raftLog, electionTimeout,
heartbeatInterval, raftTimeoutService, outbound, logProvider, raftMembershipManager,
logShipping, inFlightMap, platformModule.monitors );
raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval,
raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightMap,
config.get( CausalClusteringSettings.refuse_to_be_leader ), platformModule.monitors );

life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) );

life.add( logShipping );
}

private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem,
File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider, JobScheduler scheduler )
File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider,
JobScheduler scheduler )
{
EnterpriseCoreEditionModule.RaftLogImplementation raftLogImplementation =
EnterpriseCoreEditionModule.RaftLogImplementation.valueOf( config.get( CausalClusteringSettings.raft_log_implementation ) );
EnterpriseCoreEditionModule.RaftLogImplementation
.valueOf( config.get( CausalClusteringSettings.raft_log_implementation ) );
switch ( raftLogImplementation )
{
case IN_MEMORY:
{
return new InMemoryRaftLog();
}

case SEGMENTED:
{
long rotateAtSize = config.get( CausalClusteringSettings.raft_log_rotation_size );
int readerPoolSize = config.get( CausalClusteringSettings.raft_log_reader_pool_size );

CoreLogPruningStrategy pruningStrategy = new CoreLogPruningStrategyFactory(
config.get( CausalClusteringSettings.raft_log_pruning_strategy ), logProvider ).newInstance();
File directory = new File( clusterStateDirectory, RAFT_LOG_DIRECTORY_NAME );
return life.add( new SegmentedRaftLog( fileSystem, directory, rotateAtSize, marshal,
logProvider, readerPoolSize, systemClock(), scheduler, pruningStrategy ) );
}
default:
throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation );
case IN_MEMORY:
{
return new InMemoryRaftLog();
}

case SEGMENTED:
{
long rotateAtSize = config.get( CausalClusteringSettings.raft_log_rotation_size );
int readerPoolSize = config.get( CausalClusteringSettings.raft_log_reader_pool_size );

CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( config.get( CausalClusteringSettings.raft_log_pruning_strategy ),
logProvider ).newInstance();
File directory = new File( clusterStateDirectory, RAFT_LOG_DIRECTORY_NAME );
return life.add( new SegmentedRaftLog( fileSystem, directory, rotateAtSize, marshal, logProvider,
readerPoolSize, systemClock(), scheduler, pruningStrategy ) );
}
default:
throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation );
}
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
Expand All @@ -50,12 +51,11 @@
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

import static org.neo4j.causalclustering.core.consensus.roles.Role.LEADER;

/**
* Implements the Raft Consensus Algorithm.
*
* <p>
* The algorithm is driven by incoming messages provided to {@link #handle}.
*/
public class RaftMachine implements LeaderLocator, CoreMetaData
Expand All @@ -65,7 +65,8 @@ public class RaftMachine implements LeaderLocator, CoreMetaData

public enum Timeouts implements RenewableTimeoutService.TimeoutName
{
ELECTION, HEARTBEAT
ELECTION,
HEARTBEAT
}

private final RaftState state;
Expand All @@ -75,26 +76,23 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private final long heartbeatInterval;
private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager membershipManager;
private final boolean refuseToBecomeLeader;

private final long electionTimeout;

private final VolatileFuture<MemberId> volatileLeader = new VolatileFuture<>( null );

private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
private final Outbound<MemberId,RaftMessages.RaftMessage> outbound;
private final Log log;
private Role currentRole = Role.FOLLOWER;

private RaftLogShippingManager logShipping;

public RaftMachine( MemberId myself, StateStorage<TermState> termStorage,
StateStorage<VoteState> voteStorage, RaftLog entryLog,
long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
Outbound<MemberId, RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager,
RaftLogShippingManager logShipping,
InFlightMap<RaftLogEntry> inFlightMap,
Monitors monitors )
public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateStorage<VoteState> voteStorage,
RaftLog entryLog, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping,
InFlightMap<RaftLogEntry> inFlightMap, boolean refuseToBecomeLeader, Monitors monitors )
{
this.myself = myself;
this.electionTimeout = electionTimeout;
Expand All @@ -107,8 +105,10 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage,
this.log = logProvider.getLog( getClass() );

this.membershipManager = membershipManager;
this.refuseToBecomeLeader = refuseToBecomeLeader;

this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap, logProvider );
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap,
logProvider );

leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );

Expand All @@ -117,35 +117,46 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage,

private void initTimers()
{
electionTimer = renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(),
timeout -> {
electionTimer =
renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout ->
{
try
{
handle( new RaftMessages.Timeout.Election( myself ) );
triggerElection();
}
catch ( IOException e )
{
log.error( "Failed to process election timeout.", e );
}
timeout.renew();
} );
heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0,
timeout -> {
try
{
handle( new RaftMessages.Timeout.Heartbeat( myself ) );
}
catch ( IOException e )
{
log.error( "Failed to process heartbeat timeout.", e );
}
timeout.renew();
} );
heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, timeout ->
{
try
{
handle( new RaftMessages.Timeout.Heartbeat( myself ) );
}
catch ( IOException e )
{
log.error( "Failed to process heartbeat timeout.", e );
}
timeout.renew();
} );
}

public void triggerElection() throws IOException
{
handle( new RaftMessages.Timeout.Election( myself ) );
if ( !refuseToBecomeLeader )
{
handle( new RaftMessages.Timeout.Election( myself ) );
}
else
{
log.info(
format( "Election timeout occured, but {%s} is configured to not tirgger an election. " +
"See setting: %s",
myself, CausalClusteringSettings.refuse_to_be_leader.name() ) );
}
}

public void panic()
Expand Down
Expand Up @@ -49,6 +49,7 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
private final Log log;
private final ClientConnectorAddresses connectorAddresses;
private final HazelcastConnector connector;
private final Config config;
private final RenewableTimeoutService renewableTimeoutService;
private final AdvertisedSocketAddress transactionSource;
private final List<String> tags;
Expand All @@ -63,6 +64,7 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
long readReplicaRefreshRate, MemberId myself )
{
this.connector = connector;
this.config = config;
this.renewableTimeoutService = renewableTimeoutService;
this.readReplicaRefreshRate = readReplicaRefreshRate;
this.log = logProvider.getLog( getClass() );
Expand All @@ -78,7 +80,7 @@ public CoreTopology coreServers()
{
try
{
return retry( ( hazelcastInstance ) -> getCoreTopology( hazelcastInstance, log ) );
return retry( ( hazelcastInstance ) -> getCoreTopology( hazelcastInstance, config, log ) );
}
catch ( Exception e )
{
Expand Down

0 comments on commit 2b1fb2a

Please sign in to comment.