Skip to content

Commit

Permalink
Reap Unavailable Edge Servers from Cluster Overview
Browse files Browse the repository at this point in the history
When an edge server disconnects, we remove it from the map of
servers that the cluster overview procedure consumes.
If the edge server becomes available again, it adds itself back into
that map.
  • Loading branch information
Mark Needham committed Aug 12, 2016
1 parent 05d0b78 commit fddbe65
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 71 deletions.
Expand Up @@ -142,8 +142,8 @@ public String toString()
setting( "core_edge.discovery_listen_address", LISTEN_SOCKET_ADDRESS, "0.0.0.0:5000" );

@Description("A comma-separated list of other members of the cluster to join.")
public static final Setting<List<AdvertisedSocketAddress>> initial_hazelcast_members =
setting( "core_edge.initial_hazelcast_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), MANDATORY );
public static final Setting<List<AdvertisedSocketAddress>> initial_discovery_members =
setting( "core_edge.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), MANDATORY );

@Description("Prevents the network middleware from dumping its own logs. Defaults to true.")
public static final Setting<Boolean> disable_middleware_logging =
Expand Down Expand Up @@ -225,5 +225,10 @@ public String toString()

@Description( "Maximum transaction batch size for edge servers when applying transactions pulled from core servers." )
@Internal
public static Setting<Integer> edge_transaction_applier_batch_size = setting( "core_edge.edge_transaction_applier_batch_size", INTEGER, "16" );
public static Setting<Integer> edge_transaction_applier_batch_size =
setting( "core_edge.edge_transaction_applier_batch_size", INTEGER, "16" );

@Description( "Time To Live before edge server is considered unavailable" )
public static final Setting<Long> edge_time_to_live =
setting( "core_edge.edge_time_to_live", DURATION, "1m", min(60_000L) );
}
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.discovery;


import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -29,5 +30,5 @@ public interface DiscoveryServiceFactory
{
CoreTopologyService coreDiscoveryService( Config config, MemberId myself, LogProvider logProvider );

TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider );
TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout );
}
Expand Up @@ -25,13 +25,17 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;

import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.time.Clock.systemUTC;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.EDGE_SERVER_BOLT_ADDRESS_MAP_NAME;

Expand All @@ -40,13 +44,19 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
private final Log log;
private final AdvertisedSocketAddress boltAddress;
private final HazelcastConnector connector;
private final RenewableTimeoutService renewableTimeoutService;
private HazelcastInstance hazelcastInstance;
private RenewableTimeoutService.RenewableTimeout edgeRefreshTimer;
private long edgeTimeToLiveTimeout;

HazelcastClient( HazelcastConnector connector, LogProvider logProvider, AdvertisedSocketAddress boltAddress )
HazelcastClient( HazelcastConnector connector, LogProvider logProvider, AdvertisedSocketAddress boltAddress,
RenewableTimeoutService renewableTimeoutService, long edgeTimeToLiveTimeout )
{
this.connector = connector;
this.renewableTimeoutService = renewableTimeoutService;
this.log = logProvider.getLog( getClass() );
this.boltAddress = boltAddress;
this.edgeTimeToLiveTimeout = edgeTimeToLiveTimeout;
}

@Override
Expand All @@ -68,8 +78,22 @@ public ClusterTopology currentTopology()
@Override
public void start() throws Throwable
{
retry( ( hazelcastInstance ) -> hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME )
.put( hazelcastInstance.getLocalEndpoint().getUuid(), boltAddress.toString() ) );
retry( ( hazelcastInstance ) -> addEdgeServer( hazelcastInstance ) );
edgeRefreshTimer = renewableTimeoutService.create( () -> "Refresh Edge", 5_000, 0, timeout -> {
retry( ( hazelcastInstance ) -> addEdgeServer( hazelcastInstance ) );
timeout.renew();
} );

}

private Object addEdgeServer( HazelcastInstance hazelcastInstance )
{
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
String address = boltAddress.toString();

log.debug( "Adding edge server into cluster (%s -> %s)", uuid, address );

return hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ).put( uuid, address, edgeTimeToLiveTimeout, MILLISECONDS );
}

@Override
Expand All @@ -88,6 +112,8 @@ public synchronized void stop() throws Throwable
log.info( "Unable to shutdown Hazelcast", e );
}
}

edgeRefreshTimer.cancel();
}

private synchronized <T> T retry( Function<HazelcastInstance, T> hazelcastOperation )
Expand Down
Expand Up @@ -21,7 +21,6 @@

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.core.HazelcastInstance;

import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
Expand All @@ -44,7 +43,7 @@ public HazelcastInstance connectToHazelcast()

clientConfig.getGroupConfig().setName( config.get( CoreEdgeClusterSettings.cluster_name ) );

for ( AdvertisedSocketAddress address : config.get( CoreEdgeClusterSettings.initial_hazelcast_members ) )
for ( AdvertisedSocketAddress address : config.get( CoreEdgeClusterSettings.initial_discovery_members ) )
{
clientConfig.getNetworkConfig().addAddress( address.toString() );
}
Expand Down
Expand Up @@ -29,7 +29,6 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import com.hazelcast.config.MemberAttributeConfig;
Expand All @@ -38,6 +37,7 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
Expand Down Expand Up @@ -68,7 +68,7 @@ static ClusterTopology getClusterTopology( HazelcastInstance hazelcastInstance,
coreMembers = hazelcastInstance.getCluster().getMembers();
}
return new ClusterTopology( canBeBootstrapped( coreMembers ), toCoreMemberMap( coreMembers, log ),
edgeMembers( hazelcastInstance ) );
edgeMembers( hazelcastInstance, log ) );
}

static class GetConnectedClients implements Callable<Collection<String>>, Serializable, HazelcastInstanceAware
Expand Down Expand Up @@ -97,11 +97,11 @@ public void setHazelcastInstance( HazelcastInstance hazelcastInstance )
}
}

private static Set<EdgeAddresses> edgeMembers( HazelcastInstance hazelcastInstance )
private static Set<EdgeAddresses> edgeMembers( HazelcastInstance hazelcastInstance, Log log )
{
if ( hazelcastInstance == null )
{
// todo log a warning
log.info( "Cannot currently bind to distributed discovery service." );
return emptySet();
}

Expand All @@ -110,10 +110,12 @@ private static Set<EdgeAddresses> edgeMembers( HazelcastInstance hazelcastInstan
try
{
connectedUUIDs = executorService.submit( new GetConnectedClients( hazelcastInstance ) ).get();
removeDisconnectedEdgeServers( hazelcastInstance, connectedUUIDs );

}
catch ( InterruptedException | ExecutionException e )
{
// todo log a warning
log.info( "Unable to complete operation with distributed discovery service.", e );
return emptySet();
}

Expand All @@ -124,6 +126,20 @@ private static Set<EdgeAddresses> edgeMembers( HazelcastInstance hazelcastInstan
.collect( toSet() );
}

private static void removeDisconnectedEdgeServers( HazelcastInstance hazelcastInstance,
Collection<String> connectedUUIDs )
{
IMap<String, String> edgeServers =
hazelcastInstance.<String, String>getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME );

Set<String> toRemove = edgeServers.keySet()
.stream()
.filter( key -> !connectedUUIDs.contains( key ) )
.collect( Collectors.toSet() );

toRemove.forEach( edgeServers::remove );
}

private static boolean canBeBootstrapped( Set<Member> coreMembers )
{
Iterator<Member> iterator = coreMembers.iterator();
Expand Down
Expand Up @@ -120,7 +120,7 @@ private HazelcastInstance createHazelcastInstance()
tcpIpConfig.setEnabled( true );

List<AdvertisedSocketAddress> initialMembers =
config.get( CoreEdgeClusterSettings.initial_hazelcast_members );
config.get( CoreEdgeClusterSettings.initial_discovery_members );
for ( AdvertisedSocketAddress address : initialMembers )
{
tcpIpConfig.addMember( address.toString() );
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.discovery;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -35,10 +36,11 @@ public CoreTopologyService coreDiscoveryService( Config config, MemberId myself,
}

@Override
public TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider )
public TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress,
LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout )
{
makeHazelcastSilent( config );
return new HazelcastClient( new HazelcastClientConnector( config ), logProvider, boltAddress );
return new HazelcastClient( new HazelcastClientConnector( config ), logProvider, boltAddress, timeoutService, edgeTimeToLiveTimeout );
}

private static void makeHazelcastSilent( Config config )
Expand Down
Expand Up @@ -91,6 +91,7 @@
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.udc.UsageData;

import static java.time.Clock.systemUTC;
import static java.util.Collections.singletonMap;

import static org.neo4j.kernel.impl.factory.CommunityEditionModule.createLockManager;
Expand All @@ -116,15 +117,16 @@ public void registerProcedures( Procedures procedures )
}

EnterpriseEdgeEditionModule( final PlatformModule platformModule,
final DiscoveryServiceFactory discoveryServiceFactory )
final DiscoveryServiceFactory discoveryServiceFactory )
{
LogService logging = platformModule.logging;
Log userLog = logging.getUserLog( EnterpriseEdgeEditionModule.class );
if ( platformModule.config.get( OnlineBackupSettings.online_backup_enabled ) )
{
userLog.warn( "Backup is not supported on edge servers. Ignoring the configuration setting: "
+ OnlineBackupSettings.online_backup_enabled );
platformModule.config.augment( singletonMap( OnlineBackupSettings.online_backup_enabled.name(), Settings.FALSE ) );
+ OnlineBackupSettings.online_backup_enabled );
platformModule.config.augment( singletonMap( OnlineBackupSettings.online_backup_enabled.name(), Settings
.FALSE ) );
}

ioLimiter = new ConfigurableIOLimiter( platformModule.config );
Expand All @@ -143,7 +145,8 @@ public void registerProcedures( Procedures procedures )
statementLocksFactory = new StatementLocksFactorySelector( lockManager, config, logging ).select();

idTypeConfigurationProvider = new EnterpriseIdTypeConfigurationProvider( config );
idGeneratorFactory = dependencies.satisfyDependency( new DefaultIdGeneratorFactory( fileSystem, idTypeConfigurationProvider ) );
idGeneratorFactory = dependencies.satisfyDependency( new DefaultIdGeneratorFactory( fileSystem,
idTypeConfigurationProvider ) );
dependencies.satisfyDependency( new IdBasedStoreEntityCounters( this.idGeneratorFactory ) );

propertyKeyTokenHolder = life.add( dependencies.satisfyDependency(
Expand All @@ -160,13 +163,15 @@ public void registerProcedures( Procedures procedures )

headerInformationFactory = TransactionHeaderInformationFactory.DEFAULT;

schemaWriteGuard = () -> {};
schemaWriteGuard = () -> {
};

transactionStartTimeout = config.get( GraphDatabaseSettings.transaction_start_timeout );

constraintSemantics = new EnterpriseConstraintSemantics();

coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout );
coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard,
transactionStartTimeout );

registerRecovery( platformModule.databaseInfo, life, dependencies );

Expand All @@ -175,11 +180,18 @@ public void registerProcedures( Procedures procedures )

LogProvider logProvider = platformModule.logging.getInternalLogProvider();

TopologyService discoveryService = discoveryServiceFactory.edgeDiscoveryService( config, extractBoltAddress( config ), logProvider );
DelayedRenewableTimeoutService refreshEdgeTimeoutService = life.add( new DelayedRenewableTimeoutService(
systemUTC(), logProvider ) );

long edgeTimeToLiveTimeout = config.get( CoreEdgeClusterSettings.edge_time_to_live );

TopologyService discoveryService = discoveryServiceFactory.edgeDiscoveryService( config,
extractBoltAddress( config ), logProvider, refreshEdgeTimeoutService, edgeTimeToLiveTimeout );
life.add( dependencies.satisfyDependency( discoveryService ) );

NonBlockingChannels nonBlockingChannels = new NonBlockingChannels();
EdgeToCoreClient.ChannelInitializer channelInitializer = new EdgeToCoreClient.ChannelInitializer( logProvider, nonBlockingChannels );
EdgeToCoreClient.ChannelInitializer channelInitializer = new EdgeToCoreClient.ChannelInitializer(
logProvider, nonBlockingChannels );
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );
long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle );
EdgeToCoreClient edgeToCoreClient = life.add( new EdgeToCoreClient( logProvider,
Expand All @@ -195,22 +207,25 @@ public void registerProcedures( Procedures procedures )

LifeSupport txPulling = new LifeSupport();
int maxBatchSize = config.get( CoreEdgeClusterSettings.edge_transaction_applier_batch_size );
BatchingTxApplier batchingTxApplier = new BatchingTxApplier( maxBatchSize, dependencies.provideDependency( TransactionIdStore.class ),
BatchingTxApplier batchingTxApplier = new BatchingTxApplier( maxBatchSize, dependencies.provideDependency(
TransactionIdStore.class ),
writableCommitProcess, databaseHealthSupplier, platformModule.monitors, logProvider );
ContinuousJob txApplyJob = new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "tx-applier", NEW_THREAD ), batchingTxApplier );
ContinuousJob txApplyJob = new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group(
"tx-applier", NEW_THREAD ), batchingTxApplier );

DelayedRenewableTimeoutService txPullerTimeoutService = new DelayedRenewableTimeoutService( Clock.systemUTC(), logProvider );
DelayedRenewableTimeoutService txPullerTimeoutService = new DelayedRenewableTimeoutService( Clock.systemUTC()
, logProvider );

LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir,
new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(), platformModule
.pageCache ),
new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(),
platformModule.pageCache ),
new StoreFiles( new DefaultFileSystemAbstraction() ),
platformModule.dataSourceManager,
dependencies.provideDependency( TransactionIdStore.class ),
databaseHealthSupplier, logProvider );

TxPollingClient txPuller = new TxPollingClient( logProvider, localDatabase,
edgeToCoreClient, new ConnectToRandomCoreMember( discoveryService ),
TxPollingClient txPuller = new TxPollingClient( logProvider,
localDatabase, edgeToCoreClient, new ConnectToRandomCoreMember( discoveryService ),
txPullerTimeoutService, config.get( CoreEdgeClusterSettings.pull_interval ), batchingTxApplier );

txPulling.add( batchingTxApplier );
Expand All @@ -226,7 +241,7 @@ edgeToCoreClient, new ConnectToRandomCoreMember( discoveryService ),
life.add( new EdgeStartupProcess( storeFetcher,
localDatabase,
txPulling, new ConnectToRandomCoreMember( discoveryService ),
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider) );
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider ) );

dependencies.satisfyDependency( createSessionTracker() );
}
Expand Down
Expand Up @@ -135,7 +135,7 @@ public EdgeClusterMember addEdgeMemberWithIdAndRecordFormat( int memberId, Strin
Config config = coreClusterMember.database().getDependencyResolver().resolveDependency( Config.class );

List<AdvertisedSocketAddress> hazelcastAddresses =
config.get( CoreEdgeClusterSettings.initial_hazelcast_members );
config.get( CoreEdgeClusterSettings.initial_discovery_members );
EdgeClusterMember member = new EdgeClusterMember( parentDir, memberId, discoveryServiceFactory,
hazelcastAddresses, stringMap(), emptyMap(), recordFormat );
edgeMembers.put( memberId, member );
Expand Down Expand Up @@ -298,7 +298,7 @@ private CoreClusterMember addCoreMemberWithId( int memberId, int intendedCluster
Config config = firstOrNull( coreMembers.values() ).database().getDependencyResolver().resolveDependency(
Config.class );
List<AdvertisedSocketAddress> advertisedAddress = config.get( CoreEdgeClusterSettings
.initial_hazelcast_members );
.initial_discovery_members );

CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, intendedClusterSize, advertisedAddress,
discoveryServiceFactory, recordFormat, parentDir,
Expand Down

0 comments on commit fddbe65

Please sign in to comment.