Skip to content

Commit

Permalink
Introducing Resolution Resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
phughk committed Jul 12, 2017
1 parent 898b571 commit aa9bc1b
Show file tree
Hide file tree
Showing 17 changed files with 198 additions and 111 deletions.
Expand Up @@ -40,7 +40,8 @@ public class CausalClusterConfigurationValidator implements ConfigurationValidat
@Override
@Nonnull
public Map<String,String> validate( @Nonnull Collection<SettingValidator> settingValidators,
@Nonnull Map<String,String> rawConfig, @Nonnull Log log, boolean parsingFile ) throws InvalidSettingException
@Nonnull Map<String,String> rawConfig, @Nonnull Log log, boolean parsingFile )
throws InvalidSettingException
{
// Make sure mode is CC
Mode mode = ClusterSettings.mode.apply( rawConfig::get );
Expand Down
Expand Up @@ -27,6 +27,8 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.neo4j.causalclustering.discovery.NoOpResolutionResolver;
import org.neo4j.causalclustering.discovery.ResolutionResolver;
import org.neo4j.configuration.Description;
import org.neo4j.configuration.Internal;
import org.neo4j.configuration.LoadableConfig;
Expand All @@ -35,6 +37,7 @@
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;

import static org.neo4j.kernel.configuration.Settings.ADVERTISED_SOCKET_ADDRESS;
Expand All @@ -51,6 +54,7 @@
import static org.neo4j.kernel.configuration.Settings.list;
import static org.neo4j.kernel.configuration.Settings.listenAddress;
import static org.neo4j.kernel.configuration.Settings.min;
import static org.neo4j.kernel.configuration.Settings.options;
import static org.neo4j.kernel.configuration.Settings.setting;

@Description( "Settings for Causal Clustering" )
Expand Down Expand Up @@ -121,14 +125,36 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ),
NO_DEFAULT );

public enum DiscoveryType
{
DNS,
LIST
}

@Description( "Configure the discovery type" )
public static final Setting<DiscoveryType> discovery_type =
setting( "causal_clustering.discovery_type", options( DiscoveryType.class ), DiscoveryType.LIST.name() );

public static ResolutionResolver chooseResolver( Config config )
{
CausalClusteringSettings.DiscoveryType discoveryType = config.get( CausalClusteringSettings.discovery_type );
if ( discoveryType == CausalClusteringSettings.DiscoveryType.DNS )
{
return new NoOpResolutionResolver();
}
else
{
return new NoOpResolutionResolver();
}
}

@Description( "Prevents the network middleware from dumping its own logs. Defaults to true." )
public static final Setting<Boolean> disable_middleware_logging =
setting( "causal_clustering.disable_middleware_logging", BOOLEAN, TRUE );

@Internal // not supported yet
@Description( "Hazelcast license key" )
public static final Setting<String> hazelcast_license_key =
setting( "hazelcast.license_key", STRING, NO_DEFAULT );
public static final Setting<String> hazelcast_license_key = setting( "hazelcast.license_key", STRING, NO_DEFAULT );

@Description( "The maximum file size before the storage file is rotated (in unit of entries)" )
public static final Setting<Integer> last_flushed_state_size =
Expand Down Expand Up @@ -192,7 +218,7 @@ public class CausalClusteringSettings implements LoadableConfig

@Description( "Enable or disable the dump of all network messages pertaining to the RAFT protocol" )
public static final Setting<Boolean> raft_messages_log_enable =
setting( "causal_clustering.raft_messages_log_enable", BOOLEAN, FALSE);
setting( "causal_clustering.raft_messages_log_enable", BOOLEAN, FALSE );

@Description( "Interval of pulling updates from cores." )
public static final Setting<Duration> pull_interval = setting( "causal_clustering.pull_interval", DURATION, "1s" );
Expand Down Expand Up @@ -341,18 +367,18 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<String> load_balancing_plugin =
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );

static BaseSetting<String> prefixSetting( final String name, final Function<String, String> parser,
final String defaultValue )
static BaseSetting<String> prefixSetting( final String name, final Function<String,String> parser,
final String defaultValue )
{
BiFunction<String, Function<String, String>, String> valueLookup = ( n, settings ) -> settings.apply( n );
BiFunction<String, Function<String, String>, String> defaultLookup = determineDefaultLookup( defaultValue,
valueLookup );
BiFunction<String,Function<String,String>,String> valueLookup = ( n, settings ) -> settings.apply( n );
BiFunction<String,Function<String,String>,String> defaultLookup =
determineDefaultLookup( defaultValue, valueLookup );

return new Settings.DefaultSetting<String>( name, parser, valueLookup, defaultLookup )
{

@Override
public Map<String, String> validate( Map<String, String> rawConfig, Consumer<String> warningConsumer )
public Map<String,String> validate( Map<String,String> rawConfig, Consumer<String> warningConsumer )
throws InvalidSettingException
{
// Validate setting, if present or default value otherwise
Expand All @@ -361,10 +387,9 @@ public Map<String, String> validate( Map<String, String> rawConfig, Consumer<Str
apply( rawConfig::get );
// only return if it was present though

Map<String, String> validConfig = new HashMap<>();
Map<String,String> validConfig = new HashMap<>();

rawConfig.keySet().stream()
.filter( key -> key.startsWith( name() ) )
rawConfig.keySet().stream().filter( key -> key.startsWith( name() ) )
.forEach( key -> validConfig.put( key, rawConfig.get( key ) ) );

return validConfig;
Expand All @@ -377,9 +402,9 @@ public Map<String, String> validate( Map<String, String> rawConfig, Consumer<Str
};
}

@Description("The configuration must be valid for the configured plugin and usually exists" +
@Description( "The configuration must be valid for the configured plugin and usually exists" +
"under matching subkeys, e.g. ..config.server_policies.*" +
"This is just a top-level placeholder for the plugin-specific configuration.")
"This is just a top-level placeholder for the plugin-specific configuration." )
public static final Setting<String> load_balancing_config =
prefixSetting( "causal_clustering.load_balancing.config", STRING, "" );

Expand All @@ -395,8 +420,9 @@ public Map<String, String> validate( Map<String, String> rawConfig, Consumer<Str
public static final Setting<Boolean> multi_dc_license =
setting( "causal_clustering.multi_dc_license", BOOLEAN, FALSE );

@Description( "Name of the SSL policy to be used by the clustering, as defined under the dbms.ssl.policy.* settings." +
" If no policy is configured then the communication will not be secured." )
@Description(
"Name of the SSL policy to be used by the clustering, as defined under the dbms.ssl.policy.* settings." +
" If no policy is configured then the communication will not be secured." )
public static final Setting<String> ssl_policy =
prefixSetting( "causal_clustering.ssl_policy", STRING, NO_DEFAULT );
}
Expand Up @@ -27,8 +27,9 @@
import org.neo4j.causalclustering.core.state.storage.SimpleStorage;
import org.neo4j.causalclustering.discovery.CoreTopologyService;
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.discovery.ResolutionResolver;
import org.neo4j.causalclustering.identity.ClusterBinder;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -40,6 +41,7 @@
import org.neo4j.time.Clocks;

import static java.lang.Thread.sleep;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.chooseResolver;
import static org.neo4j.causalclustering.core.server.CoreServerModule.CLUSTER_ID_NAME;

public class ClusteringModule
Expand All @@ -56,9 +58,11 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
LogProvider userLogProvider = platformModule.logging.getUserLogProvider();
Dependencies dependencies = platformModule.dependencies;
FileSystemAbstraction fileSystem = platformModule.fileSystem;
ResolutionResolver resolutionResolver = chooseResolver( config );

topologyService = discoveryServiceFactory.coreTopologyService( config, sslPolicy,
myself, platformModule.jobScheduler, logProvider, userLogProvider );
topologyService = discoveryServiceFactory
.coreTopologyService( config, sslPolicy, myself, platformModule.jobScheduler, logProvider,
userLogProvider, resolutionResolver );

life.add( topologyService );

Expand All @@ -69,7 +73,8 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
logProvider );

CoreBootstrapper coreBootstrapper =
new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, fileSystem, config, logProvider );
new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, fileSystem, config,
logProvider );

clusterBinder = new ClusterBinder( clusterIdStorage, topologyService, logProvider, Clocks.systemClock(),
() -> sleep( 100 ), 300_000, coreBootstrapper );
Expand Down
Expand Up @@ -27,9 +27,10 @@

public interface DiscoveryServiceFactory
{
CoreTopologyService coreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider );
CoreTopologyService coreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself,
JobScheduler jobScheduler, LogProvider logProvider, LogProvider userLogProvider,
ResolutionResolver resolutionResolver );

TopologyService topologyService( Config config, SslPolicy sslPolicy, LogProvider logProvider,
JobScheduler jobScheduler, MemberId myself );
JobScheduler jobScheduler, MemberId myself, ResolutionResolver resolutionResolver );
}
Expand Up @@ -37,12 +37,15 @@ public class HazelcastClientConnector implements HazelcastConnector
private final Config config;
private final LogProvider logProvider;
private final SslPolicy sslPolicy;
private final ResolutionResolver resolutionResolver;

HazelcastClientConnector( Config config, LogProvider logProvider, SslPolicy sslPolicy )
HazelcastClientConnector( Config config, LogProvider logProvider, SslPolicy sslPolicy,
ResolutionResolver resolutionResolver )
{
this.config = config;
this.logProvider = logProvider;
this.sslPolicy = sslPolicy;
this.resolutionResolver = resolutionResolver;
}

@Override
Expand All @@ -52,7 +55,8 @@ public HazelcastInstance connectToHazelcast()

ClientNetworkConfig networkConfig = clientConfig.getNetworkConfig();

for ( AdvertisedSocketAddress address : config.get( CausalClusteringSettings.initial_discovery_members ) )
for ( AdvertisedSocketAddress address : resolutionResolver
.resolve( config.get( CausalClusteringSettings.initial_discovery_members ) ) )
{
networkConfig.addAddress( address.toString() );
}
Expand Down
Expand Up @@ -19,23 +19,8 @@
*/
package org.neo4j.causalclustering.discovery;

import com.hazelcast.config.InterfacesConfig;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import com.hazelcast.config.*;
import com.hazelcast.core.*;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper;
Expand Down Expand Up @@ -64,6 +49,15 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshGroups;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.spi.properties.GroupProperty.*;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.*;
import static org.neo4j.causalclustering.discovery.HazelcastSslConfiguration.configureSsl;

class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService
Expand All @@ -78,6 +72,7 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
private final RobustJobSchedulerWrapper scheduler;
private final long refreshPeriod;
private final LogProvider logProvider;
private final ResolutionResolver resolutionResolver;

private String membershipRegistrationId;
private JobScheduler.JobHandle refreshJob;
Expand All @@ -90,8 +85,8 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
private Thread startingThread;
private volatile boolean stopped;

HazelcastCoreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider,
LogProvider userLogProvider )
HazelcastCoreTopologyService( Config config, SslPolicy sslPolicy, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider, ResolutionResolver resolutionResolver )
{
this.config = config;
this.sslPolicy = sslPolicy;
Expand All @@ -102,6 +97,7 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
this.scheduler = new RobustJobSchedulerWrapper( jobScheduler, log );
this.userLog = userLogProvider.getLog( getClass() );
this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis();
this.resolutionResolver = resolutionResolver;
}

@Override
Expand Down Expand Up @@ -139,8 +135,10 @@ public void start() throws Throwable
{
return;
}
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() );
refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, HazelcastCoreTopologyService.this::refreshTopology );
membershipRegistrationId =
hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() );
refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod,
HazelcastCoreTopologyService.this::refreshTopology );
log.info( "Cluster discovery service started" );
} );
startingThread.setDaemon( true );
Expand Down Expand Up @@ -188,9 +186,9 @@ private HazelcastInstance createHazelcastInstance()
tcpIpConfig.setEnabled( true );

List<AdvertisedSocketAddress> initialMembers = config.get( initial_discovery_members );
for ( AdvertisedSocketAddress address : initialMembers )
for ( AdvertisedSocketAddress advertisedAddress : resolutionResolver.resolve( initialMembers ) )
{
tcpIpConfig.addMember( address.toString() );
tcpIpConfig.addMember( advertisedAddress.toString() );
}

ListenSocketAddress hazelcastAddress = config.get( discovery_listen_address );
Expand Down Expand Up @@ -240,15 +238,16 @@ private HazelcastInstance createHazelcastInstance()

c.setNetworkConfig( networkConfig );

MemberAttributeConfig memberAttributeConfig = HazelcastClusterTopology.buildMemberAttributesForCore( myself, config );
MemberAttributeConfig memberAttributeConfig =
HazelcastClusterTopology.buildMemberAttributesForCore( myself, config );

c.setMemberAttributeConfig( memberAttributeConfig );
logConnectionInfo( initialMembers );

JobScheduler.JobHandle logJob = scheduler.schedule( "HazelcastHealth", HAZELCAST_IS_HEALTHY_TIMEOUT_MS,
() -> log.warn( "The server has not been able to connect in a timely fashion to the " +
"cluster. Please consult the logs for more details. Rebooting the server may " +
"solve the problem." ) );
"cluster. Please consult the logs for more details. Rebooting the server may " +
"solve the problem." ) );

try
{
Expand All @@ -272,13 +271,9 @@ private HazelcastInstance createHazelcastInstance()

private void logConnectionInfo( List<AdvertisedSocketAddress> initialMembers )
{
userLog.info( "My connection info: " +
"[\n\tDiscovery: listen=%s, advertised=%s," +
"\n\tTransaction: listen=%s, advertised=%s, " +
"\n\tRaft: listen=%s, advertised=%s, " +
"\n\tClient Connector Addresses: %s" +
"\n]",
config.get( discovery_listen_address ),
userLog.info( "My connection info: " + "[\n\tDiscovery: listen=%s, advertised=%s," +
"\n\tTransaction: listen=%s, advertised=%s, " + "\n\tRaft: listen=%s, advertised=%s, " +
"\n\tClient Connector Addresses: %s" + "\n]", config.get( discovery_listen_address ),
config.get( CausalClusteringSettings.discovery_advertised_address ),
config.get( CausalClusteringSettings.transaction_listen_address ),
config.get( CausalClusteringSettings.transaction_advertised_address ),
Expand Down

0 comments on commit aa9bc1b

Please sign in to comment.