Skip to content

Commit

Permalink
Added logs and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
phughk committed Jul 12, 2017
1 parent aa9bc1b commit 6c3e3ab
Show file tree
Hide file tree
Showing 21 changed files with 448 additions and 103 deletions.
Expand Up @@ -40,8 +40,7 @@ public class CausalClusterConfigurationValidator implements ConfigurationValidat
@Override
@Nonnull
public Map<String,String> validate( @Nonnull Collection<SettingValidator> settingValidators,
@Nonnull Map<String,String> rawConfig, @Nonnull Log log, boolean parsingFile )
throws InvalidSettingException
@Nonnull Map<String,String> rawConfig, @Nonnull Log log, boolean parsingFile ) throws InvalidSettingException
{
// Make sure mode is CC
Mode mode = ClusterSettings.mode.apply( rawConfig::get );
Expand Down
Expand Up @@ -27,8 +27,6 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.neo4j.causalclustering.discovery.NoOpResolutionResolver;
import org.neo4j.causalclustering.discovery.ResolutionResolver;
import org.neo4j.configuration.Description;
import org.neo4j.configuration.Internal;
import org.neo4j.configuration.LoadableConfig;
Expand All @@ -37,7 +35,6 @@
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;

import static org.neo4j.kernel.configuration.Settings.ADVERTISED_SOCKET_ADDRESS;
Expand Down Expand Up @@ -135,26 +132,14 @@ public enum DiscoveryType
public static final Setting<DiscoveryType> discovery_type =
setting( "causal_clustering.discovery_type", options( DiscoveryType.class ), DiscoveryType.LIST.name() );

public static ResolutionResolver chooseResolver( Config config )
{
CausalClusteringSettings.DiscoveryType discoveryType = config.get( CausalClusteringSettings.discovery_type );
if ( discoveryType == CausalClusteringSettings.DiscoveryType.DNS )
{
return new NoOpResolutionResolver();
}
else
{
return new NoOpResolutionResolver();
}
}

@Description( "Prevents the network middleware from dumping its own logs. Defaults to true." )
public static final Setting<Boolean> disable_middleware_logging =
setting( "causal_clustering.disable_middleware_logging", BOOLEAN, TRUE );

@Internal // not supported yet
@Description( "Hazelcast license key" )
public static final Setting<String> hazelcast_license_key = setting( "hazelcast.license_key", STRING, NO_DEFAULT );
public static final Setting<String> hazelcast_license_key =
setting( "hazelcast.license_key", STRING, NO_DEFAULT );

@Description( "The maximum file size before the storage file is rotated (in unit of entries)" )
public static final Setting<Integer> last_flushed_state_size =
Expand Down Expand Up @@ -367,18 +352,18 @@ public static ResolutionResolver chooseResolver( Config config )
public static final Setting<String> load_balancing_plugin =
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );

static BaseSetting<String> prefixSetting( final String name, final Function<String,String> parser,
final String defaultValue )
static BaseSetting<String> prefixSetting( final String name, final Function<String, String> parser,
final String defaultValue )
{
BiFunction<String,Function<String,String>,String> valueLookup = ( n, settings ) -> settings.apply( n );
BiFunction<String,Function<String,String>,String> defaultLookup =
determineDefaultLookup( defaultValue, valueLookup );
BiFunction<String, Function<String, String>, String> valueLookup = ( n, settings ) -> settings.apply( n );
BiFunction<String, Function<String, String>, String> defaultLookup = determineDefaultLookup( defaultValue,
valueLookup );

return new Settings.DefaultSetting<String>( name, parser, valueLookup, defaultLookup )
{

@Override
public Map<String,String> validate( Map<String,String> rawConfig, Consumer<String> warningConsumer )
public Map<String, String> validate( Map<String, String> rawConfig, Consumer<String> warningConsumer )
throws InvalidSettingException
{
// Validate setting, if present or default value otherwise
Expand All @@ -387,9 +372,10 @@ public Map<String,String> validate( Map<String,String> rawConfig, Consumer<Strin
apply( rawConfig::get );
// only return if it was present though

Map<String,String> validConfig = new HashMap<>();
Map<String, String> validConfig = new HashMap<>();

rawConfig.keySet().stream().filter( key -> key.startsWith( name() ) )
rawConfig.keySet().stream()
.filter( key -> key.startsWith( name() ) )
.forEach( key -> validConfig.put( key, rawConfig.get( key ) ) );

return validConfig;
Expand All @@ -402,9 +388,9 @@ public Map<String,String> validate( Map<String,String> rawConfig, Consumer<Strin
};
}

@Description( "The configuration must be valid for the configured plugin and usually exists" +
@Description("The configuration must be valid for the configured plugin and usually exists" +
"under matching subkeys, e.g. ..config.server_policies.*" +
"This is just a top-level placeholder for the plugin-specific configuration." )
"This is just a top-level placeholder for the plugin-specific configuration.")
public static final Setting<String> load_balancing_config =
prefixSetting( "causal_clustering.load_balancing.config", STRING, "" );

Expand All @@ -420,9 +406,8 @@ public Map<String,String> validate( Map<String,String> rawConfig, Consumer<Strin
public static final Setting<Boolean> multi_dc_license =
setting( "causal_clustering.multi_dc_license", BOOLEAN, FALSE );

@Description(
"Name of the SSL policy to be used by the clustering, as defined under the dbms.ssl.policy.* settings." +
" If no policy is configured then the communication will not be secured." )
@Description( "Name of the SSL policy to be used by the clustering, as defined under the dbms.ssl.policy.* settings." +
" If no policy is configured then the communication will not be secured." )
public static final Setting<String> ssl_policy =
prefixSetting( "causal_clustering.ssl_policy", STRING, NO_DEFAULT );
}
Expand Up @@ -41,8 +41,8 @@
import org.neo4j.time.Clocks;

import static java.lang.Thread.sleep;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.chooseResolver;
import static org.neo4j.causalclustering.core.server.CoreServerModule.CLUSTER_ID_NAME;
import static org.neo4j.causalclustering.discovery.ResolutionResolverFactory.chooseResolver;

public class ClusteringModule
{
Expand All @@ -58,7 +58,7 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
LogProvider userLogProvider = platformModule.logging.getUserLogProvider();
Dependencies dependencies = platformModule.dependencies;
FileSystemAbstraction fileSystem = platformModule.fileSystem;
ResolutionResolver resolutionResolver = chooseResolver( config );
ResolutionResolver resolutionResolver = chooseResolver( config, logProvider, userLogProvider );

topologyService = discoveryServiceFactory
.coreTopologyService( config, sslPolicy, myself, platformModule.jobScheduler, logProvider,
Expand All @@ -73,8 +73,7 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
logProvider );

CoreBootstrapper coreBootstrapper =
new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, fileSystem, config,
logProvider );
new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, fileSystem, config, logProvider );

clusterBinder = new ClusterBinder( clusterIdStorage, topologyService, logProvider, Clocks.systemClock(),
() -> sleep( 100 ), 300_000, coreBootstrapper );
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

public class DnsResolutionResolver implements ResolutionResolver
{
private final Log userLog;
private final Log log;
private final DomainNameResolver domainNameResolver;

public DnsResolutionResolver( LogProvider logProvider, LogProvider userLogProvider,
DomainNameResolver domainNameResolver )
{
log = logProvider.getLog( getClass() );
userLog = userLogProvider.getLog( getClass() );
this.domainNameResolver = domainNameResolver;
}

@Override
public Collection<AdvertisedSocketAddress> resolve( AdvertisedSocketAddress initialAddress )
{
Set<AdvertisedSocketAddress> addresses = new HashSet<>();
try
{
InetAddress[] ipAddresses = domainNameResolver.resolveDomainName( initialAddress.getHostname() );

for ( InetAddress ipAddress : ipAddresses )
{
addresses.add( new AdvertisedSocketAddress( ipAddress.getHostAddress(), initialAddress.getPort() ) );
}

userLog.info( "Resolved initial host '%s' to %s", initialAddress, addresses );
return addresses;
}
catch ( UnknownHostException e )
{
log.error( format( "Failed to resolve host `%s` to IPs due to error: %s", initialAddress, e.getMessage() ), e );

addresses.add( initialAddress );
return addresses;
}
}
}
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.net.InetAddress;
import java.net.UnknownHostException;

public interface DomainNameResolver
{
InetAddress[] resolveDomainName(String hostname) throws UnknownHostException;
}
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class DomainNameResolverImpl implements DomainNameResolver
{
public InetAddress[] resolveDomainName(String hostname) throws UnknownHostException
{
return InetAddress.getAllByName( hostname );
}
}
Expand Up @@ -55,10 +55,12 @@ public HazelcastInstance connectToHazelcast()

ClientNetworkConfig networkConfig = clientConfig.getNetworkConfig();

for ( AdvertisedSocketAddress address : resolutionResolver
.resolve( config.get( CausalClusteringSettings.initial_discovery_members ) ) )
for ( AdvertisedSocketAddress address : config.get( CausalClusteringSettings.initial_discovery_members ) )
{
networkConfig.addAddress( address.toString() );
for ( AdvertisedSocketAddress advertisedSocketAddress : resolutionResolver.resolve( address ) )
{
networkConfig.addAddress( advertisedSocketAddress.toString() );
}
}

configureSsl( networkConfig, sslPolicy, logProvider );
Expand Down
Expand Up @@ -19,8 +19,23 @@
*/
package org.neo4j.causalclustering.discovery;

import com.hazelcast.config.*;
import com.hazelcast.core.*;
import com.hazelcast.config.InterfacesConfig;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper;
Expand Down Expand Up @@ -49,15 +64,6 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshGroups;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.spi.properties.GroupProperty.*;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.*;
import static org.neo4j.causalclustering.discovery.HazelcastSslConfiguration.configureSsl;

class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService
Expand Down Expand Up @@ -135,10 +141,8 @@ public void start() throws Throwable
{
return;
}
membershipRegistrationId =
hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() );
refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod,
HazelcastCoreTopologyService.this::refreshTopology );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() );
refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, HazelcastCoreTopologyService.this::refreshTopology );
log.info( "Cluster discovery service started" );
} );
startingThread.setDaemon( true );
Expand Down Expand Up @@ -186,9 +190,12 @@ private HazelcastInstance createHazelcastInstance()
tcpIpConfig.setEnabled( true );

List<AdvertisedSocketAddress> initialMembers = config.get( initial_discovery_members );
for ( AdvertisedSocketAddress advertisedAddress : resolutionResolver.resolve( initialMembers ) )
for ( AdvertisedSocketAddress address : initialMembers )
{
tcpIpConfig.addMember( advertisedAddress.toString() );
for ( AdvertisedSocketAddress advertisedSocketAddress : resolutionResolver.resolve( address ) )
{
tcpIpConfig.addMember( advertisedSocketAddress.toString() );
}
}

ListenSocketAddress hazelcastAddress = config.get( discovery_listen_address );
Expand Down Expand Up @@ -238,8 +245,7 @@ private HazelcastInstance createHazelcastInstance()

c.setNetworkConfig( networkConfig );

MemberAttributeConfig memberAttributeConfig =
HazelcastClusterTopology.buildMemberAttributesForCore( myself, config );
MemberAttributeConfig memberAttributeConfig = HazelcastClusterTopology.buildMemberAttributesForCore( myself, config );

c.setMemberAttributeConfig( memberAttributeConfig );
logConnectionInfo( initialMembers );
Expand Down

0 comments on commit 6c3e3ab

Please sign in to comment.