Skip to content

Commit

Permalink
Graceful Shutdown of Edge Servers
Browse files Browse the repository at this point in the history
When edge servers shutdown gracefully, they remove themselves from the
cluster topology.
  • Loading branch information
jimwebber authored and Mark Needham committed Aug 12, 2016
1 parent 363cd98 commit fa72967
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 154 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
*/ */
package org.neo4j.coreedge.catchup; package org.neo4j.coreedge.catchup;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer; import java.util.function.Consumer;


import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotListener; import io.netty.channel.ChannelInitializer;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequest; import io.netty.channel.socket.SocketChannel;

import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest; import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.coreedge.catchup.storecopy.GetStoreRequest; import org.neo4j.coreedge.catchup.storecopy.GetStoreRequest;
import org.neo4j.coreedge.catchup.storecopy.StoreFileReceiver; import org.neo4j.coreedge.catchup.storecopy.StoreFileReceiver;
Expand All @@ -38,15 +36,17 @@
import org.neo4j.coreedge.catchup.tx.TxPullResponse; import org.neo4j.coreedge.catchup.tx.TxPullResponse;
import org.neo4j.coreedge.catchup.tx.TxPullResponseListener; import org.neo4j.coreedge.catchup.tx.TxPullResponseListener;
import org.neo4j.coreedge.catchup.tx.TxStreamCompleteListener; import org.neo4j.coreedge.catchup.tx.TxStreamCompleteListener;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.CoreOutbound;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotListener;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequest;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.messaging.CoreOutbound;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.NonBlockingChannels; import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.coreedge.messaging.SenderService; import org.neo4j.coreedge.messaging.SenderService;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.helpers.Listeners; import org.neo4j.helpers.Listeners;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
Expand All @@ -55,9 +55,9 @@
import static java.util.Arrays.asList; import static java.util.Arrays.asList;


public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver, public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver,
StoreFileStreamingCompleteListener, StoreFileStreamingCompleteListener,
TxStreamCompleteListener, TxPullResponseListener, TxStreamCompleteListener, TxPullResponseListener,
CoreSnapshotListener CoreSnapshotListener
{ {
private final PullRequestMonitor pullRequestMonitor; private final PullRequestMonitor pullRequestMonitor;
private final SenderService senderService; private final SenderService senderService;
Expand All @@ -71,8 +71,8 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe
private Outbound<MemberId, Message> outbound; private Outbound<MemberId, Message> outbound;


public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors, public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService, int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService,
long logThresholdMillis ) long logThresholdMillis )
{ {
senderService = senderService =
new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels ); new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels );
Expand Down Expand Up @@ -109,7 +109,7 @@ public void pollForTransactions( MemberId serverAddress, StoreId storeId, long l


private void send( MemberId to, RequestMessageType messageType, Message contentMessage ) private void send( MemberId to, RequestMessageType messageType, Message contentMessage )
{ {
outbound.send( to, asList( messageType, contentMessage ) ); outbound.send( to, asList( messageType, contentMessage ) );
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ public String toString()
setting( "core_edge.discovery_listen_address", LISTEN_SOCKET_ADDRESS, "0.0.0.0:5000" ); setting( "core_edge.discovery_listen_address", LISTEN_SOCKET_ADDRESS, "0.0.0.0:5000" );


@Description("A comma-separated list of other members of the cluster to join.") @Description("A comma-separated list of other members of the cluster to join.")
public static final Setting<List<AdvertisedSocketAddress>> initial_core_cluster_members = public static final Setting<List<AdvertisedSocketAddress>> initial_hazelcast_members =
setting( "core_edge.initial_core_cluster_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), MANDATORY ); setting( "core_edge.initial_hazelcast_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), MANDATORY );


@Description("Prevents the network middleware from dumping its own logs. Defaults to true.") @Description("Prevents the network middleware from dumping its own logs. Defaults to true.")
public static final Setting<Boolean> disable_middleware_logging = public static final Setting<Boolean> disable_middleware_logging =
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public CoreGraphDatabase( File storeDir, Map<String, String> params,


public MemberId id() public MemberId id()
{ {
return (MemberId) getDependencyResolver().resolveDependency( RaftMachine.class ).identity(); return getDependencyResolver().resolveDependency( RaftMachine.class ).identity();
} }


public Role getRole() public Role getRole()
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Collection<CoreAddresses> coreMemberAddresses()
return coreMembers.values(); return coreMembers.values();
} }


public Set<EdgeAddresses> edgeMemberAddresses() public Collection<EdgeAddresses> edgeMemberAddresses()
{ {
return edgeAddresses; return edgeAddresses;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ClusterTopology currentTopology()
try try
{ {
return retry( ( hazelcastInstance ) -> return retry( ( hazelcastInstance ) ->
HazelcastClusterTopology.fromHazelcastInstance( hazelcastInstance, log ) ); HazelcastClusterTopology.getClusterTopology( hazelcastInstance, log ) );
} }
catch ( Exception e ) catch ( Exception e )
{ {
Expand All @@ -76,6 +76,9 @@ public synchronized void stop() throws Throwable
{ {
if ( hazelcastInstance != null ) if ( hazelcastInstance != null )
{ {
hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME )
.remove( hazelcastInstance.getLocalEndpoint().getUuid() );

hazelcastInstance.shutdown(); hazelcastInstance.shutdown();
} }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import com.hazelcast.client.HazelcastClient; import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig; import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;


import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
Expand All @@ -43,7 +44,7 @@ public HazelcastInstance connectToHazelcast()


clientConfig.getGroupConfig().setName( config.get( CoreEdgeClusterSettings.cluster_name ) ); clientConfig.getGroupConfig().setName( config.get( CoreEdgeClusterSettings.cluster_name ) );


for ( AdvertisedSocketAddress address : config.get( CoreEdgeClusterSettings.initial_core_cluster_members ) ) for ( AdvertisedSocketAddress address : config.get( CoreEdgeClusterSettings.initial_hazelcast_members ) )
{ {
clientConfig.getNetworkConfig().addAddress( address.toString() ); clientConfig.getNetworkConfig().addAddress( address.toString() );
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@
*/ */
package org.neo4j.coreedge.discovery; package org.neo4j.coreedge.discovery;


import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientService;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;

import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -40,6 +32,14 @@
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientService;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.edge.EnterpriseEdgeEditionModule; import org.neo4j.coreedge.edge.EnterpriseEdgeEditionModule;
import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.identity.MemberId;
Expand All @@ -53,13 +53,14 @@


class HazelcastClusterTopology class HazelcastClusterTopology
{ {
static final String EDGE_SERVER_BOLT_ADDRESS_MAP_NAME = "edge-servers"; // hz client uuid string -> boltAddress string // hz client uuid string -> boltAddress string
static final String EDGE_SERVER_BOLT_ADDRESS_MAP_NAME = "edge-servers";
static final String MEMBER_UUID = "member_uuid"; static final String MEMBER_UUID = "member_uuid";
static final String TRANSACTION_SERVER = "transaction_server"; static final String TRANSACTION_SERVER = "transaction_server";
static final String RAFT_SERVER = "raft_server"; static final String RAFT_SERVER = "raft_server";
static final String BOLT_SERVER = "bolt_server"; static final String BOLT_SERVER = "bolt_server";


static ClusterTopology fromHazelcastInstance( HazelcastInstance hazelcastInstance, Log log ) static ClusterTopology getClusterTopology( HazelcastInstance hazelcastInstance, Log log )
{ {
Set<Member> coreMembers = emptySet(); Set<Member> coreMembers = emptySet();
if ( hazelcastInstance != null ) if ( hazelcastInstance != null )
Expand All @@ -70,22 +71,29 @@ static ClusterTopology fromHazelcastInstance( HazelcastInstance hazelcastInstanc
edgeMembers( hazelcastInstance ) ); edgeMembers( hazelcastInstance ) );
} }


private static class GetConnectedClients implements Callable<Collection<String>>, HazelcastInstanceAware, Serializable static class GetConnectedClients implements Callable<Collection<String>>, Serializable, HazelcastInstanceAware
{ {
private transient HazelcastInstance instance; private transient HazelcastInstance instance;


GetConnectedClients( HazelcastInstance instance )
{
this.instance = instance;
}

@Override @Override
public Collection<String> call() throws Exception public Collection<String> call() throws Exception
{ {
final ClientService clientService = instance.getClientService(); final ClientService clientService = instance.getClientService();
final Collection<Client> connectedClients = clientService.getConnectedClients(); return clientService.getConnectedClients()
return connectedClients.stream().map( Client::getUuid ).collect( Collectors.toCollection( HashSet::new ) ); .stream()
.map( Client::getUuid )
.collect( Collectors.toCollection( HashSet::new ) );
} }


@Override @Override
public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) public void setHazelcastInstance( HazelcastInstance hazelcastInstance )
{ {
instance = hazelcastInstance; this.instance = hazelcastInstance;
} }
} }


Expand All @@ -101,19 +109,17 @@ private static Set<EdgeAddresses> edgeMembers( HazelcastInstance hazelcastInstan
final IExecutorService executorService = hazelcastInstance.getExecutorService( "default" ); final IExecutorService executorService = hazelcastInstance.getExecutorService( "default" );
try try
{ {
GetConnectedClients getConnectedClients = new GetConnectedClients(); connectedUUIDs = executorService.submit( new GetConnectedClients( hazelcastInstance ) ).get();
getConnectedClients.setHazelcastInstance( hazelcastInstance );
Future<Collection<String>> collectionFuture = executorService.submit( getConnectedClients );
connectedUUIDs = collectionFuture.get();
} }
catch ( InterruptedException | ExecutionException e ) catch ( InterruptedException | ExecutionException e )
{ {
// todo log a warning // todo log a warning
return emptySet(); return emptySet();
} }


return hazelcastInstance.<String/*uuid*/,String/*boltAddress*/>getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ).entrySet().stream(). return hazelcastInstance.<String/*uuid*/, String/*boltAddress*/>getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME )
filter( entry -> connectedUUIDs.contains( entry.getKey() ) ) .entrySet().stream().
filter( entry -> connectedUUIDs.contains( entry.getKey() ) )
.map( entry -> new EdgeAddresses( new AdvertisedSocketAddress( entry.getValue() /*boltAddress*/ ) ) ) .map( entry -> new EdgeAddresses( new AdvertisedSocketAddress( entry.getValue() /*boltAddress*/ ) ) )
.collect( toSet() ); .collect( toSet() );
} }
Expand All @@ -124,15 +130,15 @@ private static boolean canBeBootstrapped( Set<Member> coreMembers )
return iterator.hasNext() && iterator.next().localMember(); return iterator.hasNext() && iterator.next().localMember();
} }


static Map<MemberId,CoreAddresses> toCoreMemberMap( Set<Member> members, Log log ) static Map<MemberId, CoreAddresses> toCoreMemberMap( Set<Member> members, Log log )
{ {
Map<MemberId,CoreAddresses> coreMembers = new HashMap<>(); Map<MemberId, CoreAddresses> coreMembers = new HashMap<>();


for ( Member member : members ) for ( Member member : members )
{ {
try try
{ {
Pair<MemberId,CoreAddresses> pair = extractMemberAttributes( member ); Pair<MemberId, CoreAddresses> pair = extractMemberAttributes( member );
coreMembers.put( pair.first(), pair.other() ); coreMembers.put( pair.first(), pair.other() );
} }
catch ( IllegalArgumentException e ) catch ( IllegalArgumentException e )
Expand All @@ -149,7 +155,8 @@ static MemberAttributeConfig buildMemberAttributes( MemberId myself, Config conf
MemberAttributeConfig memberAttributeConfig = new MemberAttributeConfig(); MemberAttributeConfig memberAttributeConfig = new MemberAttributeConfig();
memberAttributeConfig.setStringAttribute( MEMBER_UUID, myself.getUuid().toString() ); memberAttributeConfig.setStringAttribute( MEMBER_UUID, myself.getUuid().toString() );


AdvertisedSocketAddress transactionSource = config.get( CoreEdgeClusterSettings.transaction_advertised_address ); AdvertisedSocketAddress transactionSource = config.get( CoreEdgeClusterSettings
.transaction_advertised_address );
memberAttributeConfig.setStringAttribute( TRANSACTION_SERVER, transactionSource.toString() ); memberAttributeConfig.setStringAttribute( TRANSACTION_SERVER, transactionSource.toString() );


AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address ); AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address );
Expand All @@ -160,7 +167,7 @@ static MemberAttributeConfig buildMemberAttributes( MemberId myself, Config conf
return memberAttributeConfig; return memberAttributeConfig;
} }


static Pair<MemberId,CoreAddresses> extractMemberAttributes( Member member ) static Pair<MemberId, CoreAddresses> extractMemberAttributes( Member member )
{ {
MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) ); MemberId memberId = new MemberId( UUID.fromString( member.getStringAttribute( MEMBER_UUID ) ) );


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private HazelcastInstance createHazelcastInstance()
tcpIpConfig.setEnabled( true ); tcpIpConfig.setEnabled( true );


List<AdvertisedSocketAddress> initialMembers = List<AdvertisedSocketAddress> initialMembers =
config.get( CoreEdgeClusterSettings.initial_core_cluster_members ); config.get( CoreEdgeClusterSettings.initial_hazelcast_members );
for ( AdvertisedSocketAddress address : initialMembers ) for ( AdvertisedSocketAddress address : initialMembers )
{ {
tcpIpConfig.addMember( address.toString() ); tcpIpConfig.addMember( address.toString() );
Expand Down Expand Up @@ -155,6 +155,6 @@ private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize(
@Override @Override
public ClusterTopology currentTopology() public ClusterTopology currentTopology()
{ {
return HazelcastClusterTopology.fromHazelcastInstance( hazelcastInstance, log ); return HazelcastClusterTopology.getClusterTopology( hazelcastInstance, log );
} }
} }
Loading

0 comments on commit fa72967

Please sign in to comment.