Skip to content

Commit

Permalink
Revert "Save discovered members and use during restart to connect aga…
Browse files Browse the repository at this point in the history
…in."

This reverts commit 9d3151b.
  • Loading branch information
Mark Needham committed Sep 2, 2016
1 parent bc76366 commit b9124b4
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 196 deletions.
Expand Up @@ -23,7 +23,6 @@

import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveredMemberRepository;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;
Expand Down Expand Up @@ -53,11 +52,7 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
SimpleStorage<ClusterId> clusterIdStorage = new SimpleStorage<>( fileSystem, clusterStateDirectory,
CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider );

DiscoveredMemberRepository discoveredMemberRepository =
new DiscoveredMemberRepository( clusterStateDirectory, fileSystem, logProvider );

topologyService = discoveryServiceFactory.coreTopologyService( config, myself, discoveredMemberRepository,
logProvider );
topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider );
BindingService bindingService = new BindingService( clusterIdStorage, topologyService, logProvider,
Clocks.systemClock(), () -> sleep( 100 ), 300_000 );

Expand Down

This file was deleted.

Expand Up @@ -27,8 +27,7 @@

public interface DiscoveryServiceFactory
{
CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider );
CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider );

TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate );
}
Expand Up @@ -31,13 +31,11 @@
import com.hazelcast.instance.GroupProperties;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.address.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -48,19 +46,16 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
{
private final Config config;
private final MemberId myself;
private final DiscoveredMemberRepository discoveredMemberRepository;
private final Log log;
private final CoreTopologyListenerService listenerService;
private String membershipRegistrationId;

private HazelcastInstance hazelcastInstance;

HazelcastCoreTopologyService( Config config, MemberId myself, DiscoveredMemberRepository discoveredMemberRepository,
LogProvider logProvider )
HazelcastCoreTopologyService( Config config, MemberId myself, LogProvider logProvider )
{
this.config = config;
this.myself = myself;
this.discoveredMemberRepository = discoveredMemberRepository;
this.listenerService = new CoreTopologyListenerService();
this.log = logProvider.getLog( getClass() );
}
Expand All @@ -82,30 +77,16 @@ public boolean publishClusterId( ClusterId clusterId )
public void memberAdded( MembershipEvent membershipEvent )
{
log.info( "Core member added %s", membershipEvent );

ClusterTopology clusterTopology = currentTopology();
log.info( "Current topology is %s", clusterTopology );
notifyMembershipChange( clusterTopology );
log.info( "Current topology is %s", currentTopology() );
listenerService.notifyListeners(currentTopology());
}

@Override
public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );

ClusterTopology clusterTopology = currentTopology();
log.info( "Current topology is %s", clusterTopology );
notifyMembershipChange( clusterTopology );
}

private void notifyMembershipChange( ClusterTopology clusterTopology )
{
Set<AdvertisedSocketAddress> members = hazelcastInstance.getCluster().getMembers().stream()
.map( member -> new AdvertisedSocketAddress(
String.format( "%s:%d", member.getSocketAddress().getHostName(),
member.getSocketAddress().getPort() ) ) ).collect( Collectors.toSet() );
discoveredMemberRepository.store( members );
listenerService.notifyListeners( clusterTopology );
log.info( "Current topology is %s", currentTopology() );
listenerService.notifyListeners(currentTopology());
}

@Override
Expand All @@ -119,7 +100,7 @@ public void start()
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this );
notifyMembershipChange( currentTopology() );
listenerService.notifyListeners(currentTopology());
}

@Override
Expand Down Expand Up @@ -151,13 +132,7 @@ private HazelcastInstance createHazelcastInstance()
{
tcpIpConfig.addMember( address.toString() );
}
Set<AdvertisedSocketAddress> previouslySeenMembers = discoveredMemberRepository.previouslyDiscoveredMembers();
for ( AdvertisedSocketAddress seenAddress : previouslySeenMembers )
{
tcpIpConfig.addMember( seenAddress.toString() );
}
log.info( String.format( "Discovering cluster with initial members: %s and previously seen members: %s",
initialMembers, previouslySeenMembers ) );
log.info( "Discovering cluster with initial members: " + initialMembers );

NetworkConfig networkConfig = new NetworkConfig();
ListenSocketAddress hazelcastAddress = config.get( CoreEdgeClusterSettings.discovery_listen_address );
Expand Down
Expand Up @@ -29,12 +29,11 @@
public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory
{
@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider )
{
makeHazelcastSilent( config );
hazelcastShouldNotPhoneHome();
return new HazelcastCoreTopologyService( config, myself, discoveredMemberRepository, logProvider );
return new HazelcastCoreTopologyService( config, myself, logProvider );
}

@Override
Expand Down

This file was deleted.

Expand Up @@ -51,8 +51,7 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory
private ClusterId clusterId;

@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider )
{
SharedDiscoveryCoreClient sharedDiscoveryCoreClient = new SharedDiscoveryCoreClient( this, myself, logProvider, config );
sharedDiscoveryCoreClient.onTopologyChange( currentTopology( sharedDiscoveryCoreClient ) );
Expand Down
Expand Up @@ -89,7 +89,7 @@ public void shouldDiscoverCompleteTargetSetWithoutDeadlocks() throws Exception

private Callable<Void> createDiscoveryJob( MemberId member, DiscoveryServiceFactory disoveryServiceFactory, Set<MemberId> expectedTargetSet ) throws ExecutionException, InterruptedException
{
CoreTopologyService topologyService = disoveryServiceFactory.coreTopologyService( config(), member, mock( DiscoveredMemberRepository.class ), logProvider );
CoreTopologyService topologyService = disoveryServiceFactory.coreTopologyService( config(), member, logProvider );
return sharedClientStarter( topologyService, expectedTargetSet );
}

Expand Down
Expand Up @@ -19,8 +19,8 @@
*/
package org.neo4j.coreedge.scenarios;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

import java.util.List;

Expand Down Expand Up @@ -49,7 +49,7 @@ public class ClusterMembershipChangeIT
public final ClusterRule clusterRule = new ClusterRule( getClass() ).withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() )
.withNumberOfCoreMembers( 3 );

@Test
@Ignore( "Incomplete, HC will hang waiting for others to join." )
public void newMemberNotInInitialMembersConfig() throws Throwable
{
// when
Expand Down

0 comments on commit b9124b4

Please sign in to comment.