Skip to content

Commit

Permalink
configuration, naming and cleanup for load balancing
Browse files Browse the repository at this point in the history
The strategies are now named plugins instead.
The configuration file now has defaults and examples.
The default policy can be overridden.
  • Loading branch information
martinfurmanski committed Feb 16, 2017
1 parent abd5176 commit 0618fc6
Show file tree
Hide file tree
Showing 25 changed files with 177 additions and 88 deletions.
Expand Up @@ -314,16 +314,21 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<List<String>> upstream_selection_strategy =
setting( "causal_clustering.upstream_selection_strategy", list( ",", STRING ), "default" );

@Description( "The load balancing plugin to use. This must be the same for all core servers participating in the cluster." )
@Description( "Tags for the server used when configuring load balancing and replication policies." )
public static Setting<List<String>> server_tags =
setting( "causal_clustering.server_tags", list( ",", STRING ), "" );

@Description( "The load balancing plugin to use." )
public static Setting<String> load_balancing_plugin =
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );

@Description( "The configuration must be valid for the configured plugin." )
@Description( "The configuration must be valid for the configured plugin and usually exists" +
"under matching subkeys, e.g. ..config.server_policies.*" +
"This is just a top-level placeholder for the plugin-specific configuration." )
public static Setting<String> load_balancing_config =
setting( "causal_clustering.load_balancing.config", STRING, "" );

@Description( "Tags for the server used when configuring load balancing and replication policies." +
" Multiple tags can be configured by separating with a comma." )
public static Setting<List<String>> server_tags =
setting( "causal_clustering.server_tags", list( ",", STRING ), "" );
@Description( "Enables shuffling of the returned load balancing result." )
public static final Setting<Boolean> load_balancing_shuffle =
setting( "causal_clustering.load_balancing.shuffle", BOOLEAN, "true" );
}
Expand Up @@ -42,11 +42,12 @@
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPlugin;
import org.neo4j.causalclustering.load_balancing.plugins.ServerShufflingPlugin;
import org.neo4j.causalclustering.load_balancing.plugins.server_policies.InvalidFilterSpecification;
import org.neo4j.causalclustering.load_balancing.plugins.server_policies.ServerPoliciesPlugin;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV2;
import org.neo4j.causalclustering.load_balancing.strategy.AllServersStrategy;
import org.neo4j.causalclustering.load_balancing.strategy.ServerShufflingStrategy;
import org.neo4j.causalclustering.logging.BetterMessageLogger;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.logging.NullMessageLogger;
Expand Down Expand Up @@ -110,17 +111,35 @@ public enum RaftLogImplementation
IN_MEMORY, SEGMENTED
}

private LoadBalancingPlugin getLoadBalancingPlugin()
{
LoadBalancingPlugin plugin;

try
{
// TODO: Select using services framework and plugin configuration.
plugin = new ServerPoliciesPlugin( topologyService, consensusModule.raftMachine(), logProvider, config );
}
catch ( InvalidFilterSpecification e )
{
throw new RuntimeException( e );
}

if ( config.get( CausalClusteringSettings.load_balancing_shuffle ) )
{
plugin = new ServerShufflingPlugin( plugin );
}
return plugin;
}

@Override
public void registerEditionSpecificProcedures( Procedures procedures ) throws KernelException
{
LoadBalancingStrategy loadBalancingStrategy = new ServerShufflingStrategy(
new AllServersStrategy( topologyService, consensusModule.raftMachine(), config ) );

procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true );
procedures.register(
new GetServersProcedureV1( topologyService, consensusModule.raftMachine(), config, logProvider ) );
procedures.register(
new GetServersProcedureV2( loadBalancingStrategy ) );
new GetServersProcedureV2( getLoadBalancingPlugin() ) );
procedures.register(
new ClusterOverviewProcedure( topologyService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new CoreRoleProcedure( consensusModule.raftMachine() ) );
Expand Down
Expand Up @@ -26,7 +26,7 @@
* Defines the interface for an implementation of the GetServersV2
* cluster discovery and load balancing procedure.
*/
public interface LoadBalancingStrategy
public interface LoadBalancingPlugin
{
interface Result
{
Expand Down
Expand Up @@ -23,10 +23,10 @@
import java.util.Objects;

/**
* The outcome of applying a load balancing strategy, which will be used by client
* The outcome of applying a load balancing plugin, which will be used by client
* software for scheduling work at the endpoints.
*/
public class LoadBalancingResult implements LoadBalancingStrategy.Result
public class LoadBalancingResult implements LoadBalancingPlugin.Result
{
private final List<Endpoint> routeEndpoints;
private final List<Endpoint> writeEndpoints;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy;
package org.neo4j.causalclustering.load_balancing.plugins;

import java.util.List;
import java.util.Map;
Expand All @@ -33,7 +33,7 @@
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.Endpoint;
import org.neo4j.causalclustering.load_balancing.LoadBalancingResult;
import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPlugin;
import org.neo4j.kernel.configuration.Config;

import static java.util.Collections.emptyList;
Expand All @@ -42,15 +42,15 @@
import static org.neo4j.causalclustering.load_balancing.Util.asList;

/**
* This is just a simple strategy and not intended for actual use. Will be replaced.
* This is just a simple plugin and not intended for actual use. Will be replaced.
*/
public class AllServersStrategy implements LoadBalancingStrategy
public class AllServersPlugin implements LoadBalancingPlugin
{
private final CoreTopologyService topologyService;
private final LeaderLocator leaderLocator;
private final Long timeToLive;

public AllServersStrategy( CoreTopologyService topologyService, LeaderLocator leaderLocator, Config config )
public AllServersPlugin( CoreTopologyService topologyService, LeaderLocator leaderLocator, Config config )
{
this.topologyService = topologyService;
this.leaderLocator = leaderLocator;
Expand Down
Expand Up @@ -17,24 +17,24 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy;
package org.neo4j.causalclustering.load_balancing.plugins;

import java.util.Collections;
import java.util.Map;

import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPlugin;

/**
* Shuffles the servers of the delegate around so that every client
* invocation gets a a little bit of that extra entropy spice.
*
* N.B: Lists are shuffled in place.
*/
public class ServerShufflingStrategy implements LoadBalancingStrategy
public class ServerShufflingPlugin implements LoadBalancingPlugin
{
private final LoadBalancingStrategy delegate;
private final LoadBalancingPlugin delegate;

public ServerShufflingStrategy( LoadBalancingStrategy delegate )
public ServerShufflingPlugin( LoadBalancingPlugin delegate )
{
this.delegate = delegate;
}
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.Objects;
import java.util.Set;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.Objects;
import java.util.Set;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.Map;
import java.util.Set;
Expand All @@ -31,15 +31,15 @@
import static org.neo4j.causalclustering.core.CausalClusteringSettings.load_balancing_config;

/**
* Loads filters under the name space of [...]config.strategy.policy_name
* Loads filters under the name space of a particular plugin.
*/
class FilteringPolicyLoader
{
static Policies load( Config config, String strategyName, LogProvider logProvider ) throws InvalidFilterSpecification
static Policies load( Config config, String pluginName, LogProvider logProvider ) throws InvalidFilterSpecification
{
Policies policies = new Policies( logProvider );

String prefix = policyPrefix( strategyName );
String prefix = policyPrefix( pluginName );
Map<String,String> rawConfig = config.getRaw();

Set<String> configKeys = rawConfig.keySet().stream()
Expand All @@ -58,8 +58,8 @@ static Policies load( Config config, String strategyName, LogProvider logProvide
return policies;
}

private static String policyPrefix( String strategyName )
private static String policyPrefix( String pluginName )
{
return format( "%s.%s.", load_balancing_config.name(), strategyName );
return format( "%s.%s.", load_balancing_config.name(), pluginName );
}
}
Expand Up @@ -17,9 +17,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

class InvalidFilterSpecification extends Exception
public class InvalidFilterSpecification extends Exception
{
InvalidFilterSpecification( String message )
{
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,9 +31,11 @@
class Policies
{
static final String POLICY_KEY = "load_balancing.policy"; // TODO: move somewhere (driver support package?)
static final String DEFAULT_POLICY_NAME = "default";
static final Policy DEFAULT_POLICY = new FilteringPolicy( IdentityFilter.as() );

private final Map<String,Policy> policies = new HashMap<>();
private final Policy DEFAULT_POLICY = new FilteringPolicy( IdentityFilter.as() );

private final Log log;

Policies( LogProvider logProvider )
Expand All @@ -53,15 +55,13 @@ void addPolicy( String policyName, Policy policy )
Policy selectFor( Map<String,String> context )
{
String policyName = context.get( POLICY_KEY );
policyName = (policyName != null) ? policyName : DEFAULT_POLICY_NAME;

Policy selectedPolicy = policies.get( policyName );

if ( policyName == null )
{
return DEFAULT_POLICY;
}
else if ( selectedPolicy == null )
if ( selectedPolicy == null )
{
log.warn( format( "Policy '%s' could not be found. Will use default instead.", policyName ) );
log.warn( format( "Policy definition for '%s' could not be found. Will use built-in default instead.", policyName ) );
return DEFAULT_POLICY;
}
else
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.Set;

Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.Objects;
import java.util.Set;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.strategy.server_policy;
package org.neo4j.causalclustering.load_balancing.plugins.server_policies;

import java.util.List;
import java.util.Map;
Expand All @@ -33,39 +33,40 @@
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.Endpoint;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPlugin;
import org.neo4j.causalclustering.load_balancing.LoadBalancingResult;
import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

import static java.util.Collections.emptyList;
import static org.neo4j.causalclustering.load_balancing.Util.asList;
import static org.neo4j.causalclustering.load_balancing.Util.extractBoltAddress;
import static org.neo4j.causalclustering.load_balancing.plugins.server_policies.FilteringPolicyLoader.load;

/**
* The server policy strategy defines policies on the server-side which
* The server policies plugin defines policies on the server-side which
* can be bound to by a client by supplying a appropriately formed context.
*
* An example would be to define a policy for a particular region.
* An example would be to define different policies for different regions.
*/
public class ServerPolicyStrategy implements LoadBalancingStrategy
public class ServerPoliciesPlugin implements LoadBalancingPlugin
{
private static final String STRATEGY_NAME = "server_policy";
private static final String PLUGIN_NAME = "server_policies";

private final TopologyService topologyService;
private final LeaderLocator leaderLocator;
private final Long timeToLive;
private final boolean allowReadsOnFollowers;
private final Policies policies;

public ServerPolicyStrategy( TopologyService topologyService, LeaderLocator leaderLocator,
public ServerPoliciesPlugin( TopologyService topologyService, LeaderLocator leaderLocator,
LogProvider logProvider, Config config ) throws InvalidFilterSpecification
{
this.topologyService = topologyService;
this.leaderLocator = leaderLocator;
this.timeToLive = config.get( CausalClusteringSettings.cluster_routing_ttl );
this.allowReadsOnFollowers = config.get( CausalClusteringSettings.cluster_allow_reads_on_followers );
this.policies = FilteringPolicyLoader.load( config, STRATEGY_NAME, logProvider );
this.policies = load( config, PLUGIN_NAME, logProvider );
}

@Override
Expand Down

0 comments on commit 0618fc6

Please sign in to comment.