Skip to content

Commit

Permalink
Splitting out minimum_consensus_group_size from
Browse files Browse the repository at this point in the history
expected_core_cluster_size to address the different lifecycle concerns.

The latter is about when we will start up (N/2+1 instances ready), the
former is how small we will allow the Raft group to shrink.

Merge branch '3.3' into 3.4
  • Loading branch information
jimwebber committed Jan 16, 2018
1 parent 71ed37c commit 7fdaa92
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 24 deletions.
Expand Up @@ -202,20 +202,20 @@ public void errorHandledForNonExistingAdditionalConfigFile() throws Exception
public void prioritiseConfigDirOverHomeDir() throws Exception public void prioritiseConfigDirOverHomeDir() throws Exception
{ {
// given // given
Files.write( configFile, singletonList( "causal_clustering.expected_core_cluster_size=4" ), WRITE ); Files.write( configFile, singletonList( "causal_clustering.minimum_core_cluster_size_at_startup=4" ), WRITE );


// and // and
Path homeDirConfigFile = homeDir.resolve( "neo4j.conf" ); Path homeDirConfigFile = homeDir.resolve( "neo4j.conf" );
Files.write( homeDirConfigFile, asList( Files.write( homeDirConfigFile, asList(
"causal_clustering.expected_core_cluster_size=5", "causal_clustering.minimum_core_cluster_size_at_startup=5",
"causal_clustering.raft_in_queue_max_batch=21" ) ); "causal_clustering.raft_in_queue_max_batch=21" ) );


// when // when
OnlineBackupContextBuilder handler = new OnlineBackupContextBuilder( homeDir, configDir ); OnlineBackupContextBuilder handler = new OnlineBackupContextBuilder( homeDir, configDir );
Config config = handler.createContext( requiredAnd() ).getConfig(); Config config = handler.createContext( requiredAnd() ).getConfig();


// then // then
assertEquals( Integer.valueOf( 4 ), config.get( CausalClusteringSettings.expected_core_cluster_size ) ); assertEquals( Integer.valueOf( 3 ), config.get( CausalClusteringSettings.minimum_core_cluster_size_at_formation ) );
assertEquals( Integer.valueOf( 64 ), config.get( CausalClusteringSettings.raft_in_queue_max_batch ) ); assertEquals( Integer.valueOf( 64 ), config.get( CausalClusteringSettings.raft_in_queue_max_batch ) );
} }


Expand All @@ -224,20 +224,20 @@ public void prioritiseAdditionalOverConfigDir() throws Exception
{ {
// given // given
Files.write( configFile, asList( Files.write( configFile, asList(
"causal_clustering.expected_core_cluster_size=4", "causal_clustering.minimum_core_cluster_size_at_startup=4",
"causal_clustering.raft_in_queue_max_batch=21" ) ); "causal_clustering.raft_in_queue_max_batch=21" ) );


// and // and
Path additionalConf = homeDir.resolve( "additional-neo4j.conf" ); Path additionalConf = homeDir.resolve( "additional-neo4j.conf" );
Files.write( additionalConf, singletonList( "causal_clustering.expected_core_cluster_size=5" ) ); Files.write( additionalConf, singletonList( "causal_clustering.minimum_core_cluster_size_at_startup=5" ) );


// when // when
OnlineBackupContextBuilder handler = new OnlineBackupContextBuilder( homeDir, configDir ); OnlineBackupContextBuilder handler = new OnlineBackupContextBuilder( homeDir, configDir );
OnlineBackupContext context = handler.createContext( requiredAnd( "--additional-config=" + additionalConf ) ); OnlineBackupContext context = handler.createContext( requiredAnd( "--additional-config=" + additionalConf ) );
Config config = context.getConfig(); Config config = context.getConfig();


// then // then
assertEquals( Integer.valueOf( 5 ), config.get( CausalClusteringSettings.expected_core_cluster_size ) ); assertEquals( Integer.valueOf( 3 ), config.get( CausalClusteringSettings.minimum_core_cluster_size_at_formation ) );
assertEquals( Integer.valueOf( 21 ), config.get( CausalClusteringSettings.raft_in_queue_max_batch ) ); assertEquals( Integer.valueOf( 21 ), config.get( CausalClusteringSettings.raft_in_queue_max_batch ) );
} }


Expand Down
Expand Up @@ -24,14 +24,16 @@
import javax.annotation.Nonnull; import javax.annotation.Nonnull;


import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader; import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings;
import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings.Mode;
import org.neo4j.graphdb.config.InvalidSettingException; import org.neo4j.graphdb.config.InvalidSettingException;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.ConfigurationValidator; import org.neo4j.kernel.configuration.ConfigurationValidator;
import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings;
import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings.Mode;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static org.neo4j.causalclustering.core.CausalClusteringSettings.initial_discovery_members; import static org.neo4j.causalclustering.core.CausalClusteringSettings.initial_discovery_members;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.minimum_core_cluster_size_at_runtime;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.minimum_core_cluster_size_at_formation;


public class CausalClusterConfigurationValidator implements ConfigurationValidator public class CausalClusterConfigurationValidator implements ConfigurationValidator
{ {
Expand All @@ -45,25 +47,38 @@ public Map<String,String> validate( @Nonnull Config config, @Nonnull Log log ) t
validateInitialDiscoveryMembers( config ); validateInitialDiscoveryMembers( config );
validateBoltConnector( config ); validateBoltConnector( config );
validateLoadBalancing( config, log ); validateLoadBalancing( config, log );
validateDeclaredClusterSizes( config );
} }


return Collections.emptyMap(); return Collections.emptyMap();
} }


private static void validateLoadBalancing( Config config, Log log ) private void validateDeclaredClusterSizes( Config config )
{
int startup = config.get( minimum_core_cluster_size_at_formation );
int runtime = config.get( minimum_core_cluster_size_at_runtime );

if ( runtime > startup )
{
throw new InvalidSettingException( String.format( "'%s' must be set greater than or equal to '%s'",
minimum_core_cluster_size_at_formation.name(), minimum_core_cluster_size_at_runtime.name() ) );
}
}

private void validateLoadBalancing( Config config, Log log )
{ {
LoadBalancingPluginLoader.validate( config, log ); LoadBalancingPluginLoader.validate( config, log );
} }


private static void validateBoltConnector( Config config ) private void validateBoltConnector( Config config )
{ {
if ( config.enabledBoltConnectors().isEmpty() ) if ( config.enabledBoltConnectors().isEmpty() )
{ {
throw new InvalidSettingException( "A Bolt connector must be configured to run a cluster" ); throw new InvalidSettingException( "A Bolt connector must be configured to run a cluster" );
} }
} }


private static void validateInitialDiscoveryMembers( Config config ) private void validateInitialDiscoveryMembers( Config config )
{ {
if ( !config.isConfigured( initial_discovery_members ) ) if ( !config.isConfigured( initial_discovery_members ) )
{ {
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.configuration.Description; import org.neo4j.configuration.Description;
import org.neo4j.configuration.Internal; import org.neo4j.configuration.Internal;
import org.neo4j.configuration.LoadableConfig; import org.neo4j.configuration.LoadableConfig;
import org.neo4j.configuration.ReplacedBy;
import org.neo4j.graphdb.config.BaseSetting; import org.neo4j.graphdb.config.BaseSetting;
import org.neo4j.graphdb.config.InvalidSettingException; import org.neo4j.graphdb.config.InvalidSettingException;
import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
Expand Down Expand Up @@ -101,10 +102,28 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<Integer> raft_in_queue_max_batch = public static final Setting<Integer> raft_in_queue_max_batch =
setting( "causal_clustering.raft_in_queue_max_batch", INTEGER, "64" ); setting( "causal_clustering.raft_in_queue_max_batch", INTEGER, "64" );


@Description( "Expected number of Core machines in the cluster" ) @Description( "Expected number of Core machines in the cluster before startup" )
@Deprecated
@ReplacedBy( "minimum_core_cluster_size_at_startup and minimum_core_cluster_size_at_runtime" )
public static final Setting<Integer> expected_core_cluster_size = public static final Setting<Integer> expected_core_cluster_size =
setting( "causal_clustering.expected_core_cluster_size", INTEGER, "3" ); setting( "causal_clustering.expected_core_cluster_size", INTEGER, "3" );


@Description( "Minimum number of Core machines in the cluster at formation. The expected_core_cluster size setting is used when bootstrapping the " +
"cluster on first formation. A cluster will not form without the configured amount of cores and this should in general be configured to the" +
" full and fixed amount." )
public static final Setting<Integer> minimum_core_cluster_size_at_formation =
buildSetting( "causal_clustering.minimum_core_cluster_size_at_formation", INTEGER, expected_core_cluster_size.getDefaultValue() )
.constraint( min( 2 ) ).build();

@Description( "Minimum number of Core machines required to be available at runtime. The consensus group size (core machines successfully voted into the " +
"Raft) can shrink and grow dynamically but bounded on the lower end at this number. The intention is in almost all cases for users to leave this " +
"setting alone. If you have 5 machines then you can survive failures down to 3 remaining, e.g. with 2 dead members. The three remaining can " +
"still vote another replacement member in successfully up to a total of 6 (2 of which are still dead) and then after this, one of the " +
"superfluous dead members will be immediately and automatically voted out (so you are left with 5 members in the consensus group, 1 of which " +
"is currently dead). Operationally you can now bring the last machine up by bringing in another replacement or repairing the dead one." )
public static final Setting<Integer> minimum_core_cluster_size_at_runtime =
buildSetting( "causal_clustering.minimum_core_cluster_size_at_runtime", INTEGER, "3" ).constraint( min( 2 ) ).build();

@Description( "Network interface and port for the transaction shipping server to listen on." ) @Description( "Network interface and port for the transaction shipping server to listen on." )
public static final Setting<ListenSocketAddress> transaction_listen_address = public static final Setting<ListenSocketAddress> transaction_listen_address =
listenAddress( "causal_clustering.transaction_listen_address", 6000 ); listenAddress( "causal_clustering.transaction_listen_address", 6000 );
Expand Down
Expand Up @@ -117,14 +117,14 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,


leaderAvailabilityTimers = createElectionTiming( config, timerService, logProvider ); leaderAvailabilityTimers = createElectionTiming( config, timerService, logProvider );


Integer expectedClusterSize = config.get( CausalClusteringSettings.expected_core_cluster_size ); Integer minimumConsensusGroupSize = config.get( CausalClusteringSettings.minimum_core_cluster_size_at_runtime );


MemberIdSetBuilder memberSetBuilder = new MemberIdSetBuilder(); MemberIdSetBuilder memberSetBuilder = new MemberIdSetBuilder();


SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound ); SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound );


raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, leaderAvailabilityTimers.getElectionTimeout(), systemClock(), config.get( join_catch_up_timeout ).toMillis(), minimumConsensusGroupSize, leaderAvailabilityTimers.getElectionTimeout(), systemClock(), config.get( join_catch_up_timeout ).toMillis(),
raftMembershipStorage ); raftMembershipStorage );


life.add( raftMembershipManager ); life.add( raftMembershipManager );
Expand Down
Expand Up @@ -64,7 +64,7 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe
private LongSupplier recoverFromIndexSupplier; private LongSupplier recoverFromIndexSupplier;
private RaftMembershipState state; private RaftMembershipState state;


private final int expectedClusterSize; private final int minimumConsensusGroupSize;


private volatile Set<MemberId> votingMembers = Collections.unmodifiableSet( new HashSet<>() ); private volatile Set<MemberId> votingMembers = Collections.unmodifiableSet( new HashSet<>() );
// votingMembers + additionalReplicationMembers // votingMembers + additionalReplicationMembers
Expand All @@ -74,13 +74,13 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe
private Set<MemberId> additionalReplicationMembers = new HashSet<>(); private Set<MemberId> additionalReplicationMembers = new HashSet<>();


public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder<MemberId> memberSetBuilder, public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder<MemberId> memberSetBuilder,
ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, long electionTimeout, ReadableRaftLog raftLog, LogProvider logProvider, int minimumConsensusGroupSize, long electionTimeout,
Clock clock, long catchupTimeout, StateStorage<RaftMembershipState> membershipStorage ) Clock clock, long catchupTimeout, StateStorage<RaftMembershipState> membershipStorage )
{ {
this.sendToMyself = sendToMyself; this.sendToMyself = sendToMyself;
this.memberSetBuilder = memberSetBuilder; this.memberSetBuilder = memberSetBuilder;
this.raftLog = raftLog; this.raftLog = raftLog;
this.expectedClusterSize = expectedClusterSize; this.minimumConsensusGroupSize = minimumConsensusGroupSize;
this.storage = membershipStorage; this.storage = membershipStorage;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.membershipChanger = this.membershipChanger =
Expand Down Expand Up @@ -183,7 +183,7 @@ void removeAdditionalReplicationMember( MemberId member )
private boolean isSafeToRemoveMember() private boolean isSafeToRemoveMember()
{ {
Set<MemberId> votingMembers = votingMembers(); Set<MemberId> votingMembers = votingMembers();
boolean safeToRemoveMember = votingMembers != null && votingMembers.size() > expectedClusterSize; boolean safeToRemoveMember = votingMembers != null && votingMembers.size() > minimumConsensusGroupSize;


if ( !safeToRemoveMember ) if ( !safeToRemoveMember )
{ {
Expand All @@ -192,7 +192,7 @@ private boolean isSafeToRemoveMember()
log.info( "Not safe to remove %s %s because it would reduce the number of voting members below the expected " + log.info( "Not safe to remove %s %s because it would reduce the number of voting members below the expected " +
"cluster size of %d. Voting members: %s", "cluster size of %d. Voting members: %s",
membersToRemove.size() > 1 ? "members" : "member", membersToRemove.size() > 1 ? "members" : "member",
membersToRemove, expectedClusterSize, votingMembers ); membersToRemove, minimumConsensusGroupSize, votingMembers );
} }


return safeToRemoveMember; return safeToRemoveMember;
Expand All @@ -216,7 +216,7 @@ private void checkForStartCondition()
{ {
membershipChanger.onMissingMember( first( missingMembers() ) ); membershipChanger.onMissingMember( first( missingMembers() ) );
} }
else if ( superfluousMembers().size() > 0 && isSafeToRemoveMember( ) ) else if ( superfluousMembers().size() > 0 && isSafeToRemoveMember() )
{ {
membershipChanger.onSuperfluousMember( first( superfluousMembers() ) ); membershipChanger.onSuperfluousMember( first( superfluousMembers() ) );
} }
Expand Down
Expand Up @@ -300,7 +300,7 @@ private void logConnectionInfo( List<AdvertisedSocketAddress> initialMembers )


private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize() private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize()
{ {
return config.get( CausalClusteringSettings.expected_core_cluster_size ) / 2 + 1; return config.get( CausalClusteringSettings.minimum_core_cluster_size_at_formation ) / 2 + 1;
} }


@Override @Override
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings; import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Level; import org.neo4j.logging.Level;


import static java.lang.String.format; import static java.lang.String.format;
Expand Down Expand Up @@ -102,7 +101,8 @@ public CoreClusterMember( int serverId,
config.put( CausalClusteringSettings.transaction_listen_address.name(), listenAddress( listenAddress, txPort ) ); config.put( CausalClusteringSettings.transaction_listen_address.name(), listenAddress( listenAddress, txPort ) );
config.put( CausalClusteringSettings.raft_listen_address.name(), listenAddress( listenAddress, raftPort ) ); config.put( CausalClusteringSettings.raft_listen_address.name(), listenAddress( listenAddress, raftPort ) );
config.put( CausalClusteringSettings.cluster_topology_refresh.name(), "1000ms" ); config.put( CausalClusteringSettings.cluster_topology_refresh.name(), "1000ms" );
config.put( CausalClusteringSettings.expected_core_cluster_size.name(), String.valueOf( clusterSize ) ); config.put( CausalClusteringSettings.minimum_core_cluster_size_at_formation.name(), String.valueOf( clusterSize ) );
config.put( CausalClusteringSettings.minimum_core_cluster_size_at_runtime.name(), String.valueOf( clusterSize ) );
config.put( CausalClusteringSettings.leader_election_timeout.name(), "500ms" ); config.put( CausalClusteringSettings.leader_election_timeout.name(), "500ms" );
config.put( CausalClusteringSettings.raft_messages_log_enable.name(), Settings.TRUE ); config.put( CausalClusteringSettings.raft_messages_log_enable.name(), Settings.TRUE );
config.put( GraphDatabaseSettings.store_internal_log_level.name(), Level.DEBUG.name() ); config.put( GraphDatabaseSettings.store_internal_log_level.name(), Level.DEBUG.name() );
Expand Down
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.scenarios;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.test.causalclustering.ClusterRule;
import org.neo4j.test.rule.VerboseTimeout;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;

public class ConsensusGroupSettingsIT
{
private final ClusterRule clusterRule = new ClusterRule().withNumberOfCoreMembers( 5 ).withNumberOfReadReplicas( 0 )
.withInstanceCoreParam(CausalClusteringSettings.minimum_core_cluster_size_at_formation, value -> "5" )
.withInstanceCoreParam( CausalClusteringSettings.minimum_core_cluster_size_at_runtime,value -> "3" )
.withInstanceCoreParam( CausalClusteringSettings.leader_election_timeout, value -> "1s" );

private final VerboseTimeout timeout = VerboseTimeout.builder().withTimeout( 1000, SECONDS ).build();

@Rule
public RuleChain ruleChain = RuleChain.outerRule( clusterRule ).around( timeout );

private Cluster cluster;

@Before
public void setup() throws Exception
{
cluster = clusterRule.startCluster();
}

@Test
public void shouldNotAllowTheConsensusGroupToDropBelowMinimumConsensusGroupSize() throws Exception
{
// given
int numberOfCoreSeversToRemove = 3;

cluster.awaitLeader();

// when
for ( int i = 0; i < numberOfCoreSeversToRemove; i++ )
{
cluster.removeCoreMember( cluster.getDbWithRole( Role.LEADER ) );
cluster.awaitLeader( 30, SECONDS );
}

// then

assertEquals(3, cluster.coreMembers().iterator().next().raft().replicationMembers().size());
}
}
Expand Up @@ -201,7 +201,8 @@ void boot() throws IOException, InterruptedException
builder.withConfig( CausalClusteringSettings.transaction_listen_address.name(), specifyPortOnly( txPort ) ); builder.withConfig( CausalClusteringSettings.transaction_listen_address.name(), specifyPortOnly( txPort ) );
builder.withConfig( CausalClusteringSettings.raft_listen_address.name(), specifyPortOnly( raftPort ) ); builder.withConfig( CausalClusteringSettings.raft_listen_address.name(), specifyPortOnly( raftPort ) );


builder.withConfig( CausalClusteringSettings.expected_core_cluster_size.name(), String.valueOf( nCores ) ); builder.withConfig( CausalClusteringSettings.minimum_core_cluster_size_at_formation.name(), String.valueOf( nCores ) );
builder.withConfig( CausalClusteringSettings.minimum_core_cluster_size_at_runtime.name(), String.valueOf( nCores ) );
builder.withConfig( CausalClusteringSettings.server_groups.name(), "core," + "core" + coreId ); builder.withConfig( CausalClusteringSettings.server_groups.name(), "core," + "core" + coreId );
configureConnectors( boltPort, httpPort, httpsPort, builder ); configureConnectors( boltPort, httpPort, httpsPort, builder );


Expand Down

0 comments on commit 7fdaa92

Please sign in to comment.