Skip to content

Commit

Permalink
Edge topology assembled asynchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed Sep 16, 2016
1 parent 2fda121 commit eb3a326
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 59 deletions.
Expand Up @@ -23,6 +23,8 @@

public interface CoreTopologyService extends TopologyService
{
EdgeTopology edgeServers();

void addCoreTopologyListener( Listener listener );

/**
Expand Down
Expand Up @@ -37,7 +37,7 @@

class HazelcastClient extends LifecycleAdapter implements TopologyService
{
public static final RenewableTimeoutService.TimeoutName REFRESH_EDGE = () -> "Refresh Edge";
static final RenewableTimeoutService.TimeoutName REFRESH_EDGE = () -> "Refresh Edge";
private final Log log;
private final AdvertisedSocketAddress boltAddress;
private final HazelcastConnector connector;
Expand All @@ -58,22 +58,6 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
this.edgeTimeToLiveTimeout = edgeTimeToLiveTimeout;
}

@Override
public EdgeTopology edgeServers()
{
try
{
return retry( ( hazelcastInstance ) ->
HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log ) );
}
catch ( Exception e )
{
log.info( "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. "
+ "Connection will be reattempted on next polling attempt.", e );
return EdgeTopology.EMPTY;
}
}

@Override
public CoreTopology coreServers()
{
Expand All @@ -95,7 +79,7 @@ public void start() throws Throwable
{
edgeRefreshTimer = renewableTimeoutService.create( REFRESH_EDGE, edgeRefreshRate, 0, timeout -> {
timeout.renew();
retry( ( hazelcastInstance ) -> addEdgeServer( hazelcastInstance ) );
retry( this::addEdgeServer );
} );
}

Expand Down
Expand Up @@ -24,6 +24,8 @@
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
Expand All @@ -32,6 +34,7 @@
import com.hazelcast.core.MembershipListener;
import com.hazelcast.instance.GroupProperties;
import com.hazelcast.instance.GroupProperty;
import com.hazelcast.map.impl.MapListenerAdapter;

import java.util.List;

Expand All @@ -46,16 +49,20 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService, MembershipListener
import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.EDGE_SERVER_BOLT_ADDRESS_MAP_NAME;

class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService
{
private final Config config;
private final MemberId myself;
private final Log log;
private final Log userLog;
private final CoreTopologyListenerService listenerService;
private String membershipRegistrationId;
private String mapRegistrationId;

private HazelcastInstance hazelcastInstance;
private EdgeTopology latestEdgeTopology;

HazelcastCoreTopologyService( Config config, MemberId myself, LogProvider logProvider, LogProvider userLogProvider )
{
Expand All @@ -79,34 +86,15 @@ public boolean casClusterId( ClusterId clusterId )
return HazelcastClusterTopology.casClusterId( hazelcastInstance, clusterId );
}

@Override
public void memberAdded( MembershipEvent membershipEvent )
{
log.info( "Core member added %s", membershipEvent );
log.info( "Current core topology is %s", coreServers() );
listenerService.notifyListeners( coreServers());
}

@Override
public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );
log.info( "Current core topology is %s", coreServers() );
listenerService.notifyListeners( coreServers());
}

@Override
public void memberAttributeChanged( MemberAttributeEvent memberAttributeEvent )
{
}

@Override
public void start()
{
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() );
mapRegistrationId = hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ).addEntryListener( new OurEntryListener(), true );
listenerService.notifyListeners( coreServers());
refreshEdgeTopology();
}

@Override
Expand All @@ -117,6 +105,7 @@ public void stop()
try
{
hazelcastInstance.getCluster().removeMembershipListener( membershipRegistrationId );
hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ).removeEntryListener( mapRegistrationId );
hazelcastInstance.getLifecycleService().terminate();
}
catch ( Throwable e )
Expand Down Expand Up @@ -187,12 +176,50 @@ private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize(
@Override
public EdgeTopology edgeServers()
{
return HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log );
return latestEdgeTopology;
}

@Override
public CoreTopology coreServers()
{
return HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log );
}

private void refreshEdgeTopology()
{
latestEdgeTopology = HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log );
}

private class OurEntryListener extends MapListenerAdapter
{
@Override
public void onEntryEvent( EntryEvent event )
{
refreshEdgeTopology();
}
}

private class OurMembershipListener implements MembershipListener
{
@Override
public void memberAdded( MembershipEvent membershipEvent )
{
log.info( "Core member added %s", membershipEvent );
log.info( "Current core topology is %s", coreServers() );
listenerService.notifyListeners( coreServers());
}

@Override
public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );
log.info( "Current core topology is %s", coreServers() );
listenerService.notifyListeners( coreServers());
}

@Override
public void memberAttributeChanged( MemberAttributeEvent memberAttributeEvent )
{
}
}
}
Expand Up @@ -23,7 +23,5 @@

public interface TopologyService extends Lifecycle
{
EdgeTopology edgeServers();

CoreTopology coreServers();
}
Expand Up @@ -289,13 +289,12 @@ public void shouldRegisterEdgeServerInTopology() throws Throwable
renewableTimeoutService, 60_000, 5_000 );

hazelcastClient.start();
renewableTimeoutService.invokeTimeout( REFRESH_EDGE );

// when
EdgeTopology clusterTopology = hazelcastClient.edgeServers();
renewableTimeoutService.invokeTimeout( REFRESH_EDGE );

// then
assertEquals( 1, clusterTopology.members().size() );
assertEquals( 1, hazelcastMap.size() );
}

@Test
Expand Down Expand Up @@ -334,15 +333,14 @@ public void shouldRemoveEdgeServersOnGracefulShutdown() throws Throwable
renewableTimeoutService, 60_000, 5_000 );

hazelcastClient.start();
renewableTimeoutService.invokeTimeout( REFRESH_EDGE );

int numberOfStartedEdgeServers = hazelcastClient.edgeServers().members().size();
renewableTimeoutService.invokeTimeout( REFRESH_EDGE );

// when
hazelcastClient.stop();

// then
assertEquals( 0, numberOfStartedEdgeServers - 1 );
assertEquals( 0, hazelcastMap.size() );
}

private Member makeMember( int id ) throws UnknownHostException
Expand Down
Expand Up @@ -51,12 +51,6 @@ public void stop() throws Throwable
sharedDiscoveryService.unRegisterEdgeMember( addresses );
}

@Override
public EdgeTopology edgeServers()
{
return sharedDiscoveryService.edgeTopology();
}

@Override
public CoreTopology coreServers()
{
Expand Down
Expand Up @@ -199,9 +199,6 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreI
}
catch ( RuntimeException required )
{

required.printStackTrace();

// Lifecycle should throw exception, server should not start.
assertThat( required.getCause(), instanceOf( LifecycleException.class ) );
assertThat( required.getCause().getCause(), instanceOf( IllegalStateException.class ) );
Expand Down

0 comments on commit eb3a326

Please sign in to comment.