diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreTopologyService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreTopologyService.java index 784682bf292e7..6a70fd127f5b9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreTopologyService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/CoreTopologyService.java @@ -23,6 +23,8 @@ public interface CoreTopologyService extends TopologyService { + EdgeTopology edgeServers(); + void addCoreTopologyListener( Listener listener ); /** diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java index 03f23e8bbe9ed..35b48d59c43d6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastClient.java @@ -37,7 +37,7 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService { - public static final RenewableTimeoutService.TimeoutName REFRESH_EDGE = () -> "Refresh Edge"; + static final RenewableTimeoutService.TimeoutName REFRESH_EDGE = () -> "Refresh Edge"; private final Log log; private final AdvertisedSocketAddress boltAddress; private final HazelcastConnector connector; @@ -58,22 +58,6 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService this.edgeTimeToLiveTimeout = edgeTimeToLiveTimeout; } - @Override - public EdgeTopology edgeServers() - { - try - { - return retry( ( hazelcastInstance ) -> - HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log ) ); - } - catch ( Exception e ) - { - log.info( "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " - + "Connection will be reattempted on next polling attempt.", e ); - return EdgeTopology.EMPTY; - } - } - @Override public CoreTopology coreServers() { @@ -95,7 +79,7 @@ public void start() throws Throwable { edgeRefreshTimer = renewableTimeoutService.create( REFRESH_EDGE, edgeRefreshRate, 0, timeout -> { timeout.renew(); - retry( ( hazelcastInstance ) -> addEdgeServer( hazelcastInstance ) ); + retry( this::addEdgeServer ); } ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java index b9f66faf3876c..0d8f9ac6f738f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java @@ -24,6 +24,8 @@ import com.hazelcast.config.MemberAttributeConfig; import com.hazelcast.config.NetworkConfig; import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.EntryAdapter; +import com.hazelcast.core.EntryEvent; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.HazelcastInstance; @@ -32,6 +34,7 @@ import com.hazelcast.core.MembershipListener; import com.hazelcast.instance.GroupProperties; import com.hazelcast.instance.GroupProperty; +import com.hazelcast.map.impl.MapListenerAdapter; import java.util.List; @@ -46,7 +49,9 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService, MembershipListener +import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.EDGE_SERVER_BOLT_ADDRESS_MAP_NAME; + +class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService { private final Config config; private final MemberId myself; @@ -54,8 +59,10 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol private final Log userLog; private final CoreTopologyListenerService listenerService; private String membershipRegistrationId; + private String mapRegistrationId; private HazelcastInstance hazelcastInstance; + private EdgeTopology latestEdgeTopology; HazelcastCoreTopologyService( Config config, MemberId myself, LogProvider logProvider, LogProvider userLogProvider ) { @@ -79,34 +86,15 @@ public boolean casClusterId( ClusterId clusterId ) return HazelcastClusterTopology.casClusterId( hazelcastInstance, clusterId ); } - @Override - public void memberAdded( MembershipEvent membershipEvent ) - { - log.info( "Core member added %s", membershipEvent ); - log.info( "Current core topology is %s", coreServers() ); - listenerService.notifyListeners( coreServers()); - } - - @Override - public void memberRemoved( MembershipEvent membershipEvent ) - { - log.info( "Core member removed %s", membershipEvent ); - log.info( "Current core topology is %s", coreServers() ); - listenerService.notifyListeners( coreServers()); - } - - @Override - public void memberAttributeChanged( MemberAttributeEvent memberAttributeEvent ) - { - } - @Override public void start() { hazelcastInstance = createHazelcastInstance(); log.info( "Cluster discovery service started" ); - membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this ); + membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() ); + mapRegistrationId = hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ).addEntryListener( new OurEntryListener(), true ); listenerService.notifyListeners( coreServers()); + refreshEdgeTopology(); } @Override @@ -117,6 +105,7 @@ public void stop() try { hazelcastInstance.getCluster().removeMembershipListener( membershipRegistrationId ); + hazelcastInstance.getMap( EDGE_SERVER_BOLT_ADDRESS_MAP_NAME ).removeEntryListener( mapRegistrationId ); hazelcastInstance.getLifecycleService().terminate(); } catch ( Throwable e ) @@ -187,7 +176,7 @@ private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize( @Override public EdgeTopology edgeServers() { - return HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log ); + return latestEdgeTopology; } @Override @@ -195,4 +184,42 @@ public CoreTopology coreServers() { return HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ); } + + private void refreshEdgeTopology() + { + latestEdgeTopology = HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log ); + } + + private class OurEntryListener extends MapListenerAdapter + { + @Override + public void onEntryEvent( EntryEvent event ) + { + refreshEdgeTopology(); + } + } + + private class OurMembershipListener implements MembershipListener + { + @Override + public void memberAdded( MembershipEvent membershipEvent ) + { + log.info( "Core member added %s", membershipEvent ); + log.info( "Current core topology is %s", coreServers() ); + listenerService.notifyListeners( coreServers()); + } + + @Override + public void memberRemoved( MembershipEvent membershipEvent ) + { + log.info( "Core member removed %s", membershipEvent ); + log.info( "Current core topology is %s", coreServers() ); + listenerService.notifyListeners( coreServers()); + } + + @Override + public void memberAttributeChanged( MemberAttributeEvent memberAttributeEvent ) + { + } + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java index 43baa1d0f81ad..30ac995365f3a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/TopologyService.java @@ -23,7 +23,5 @@ public interface TopologyService extends Lifecycle { - EdgeTopology edgeServers(); - CoreTopology coreServers(); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java index ffdd7e1c4fec3..b7c86c9d60508 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClientTest.java @@ -289,13 +289,12 @@ public void shouldRegisterEdgeServerInTopology() throws Throwable renewableTimeoutService, 60_000, 5_000 ); hazelcastClient.start(); - renewableTimeoutService.invokeTimeout( REFRESH_EDGE ); // when - EdgeTopology clusterTopology = hazelcastClient.edgeServers(); + renewableTimeoutService.invokeTimeout( REFRESH_EDGE ); // then - assertEquals( 1, clusterTopology.members().size() ); + assertEquals( 1, hazelcastMap.size() ); } @Test @@ -334,15 +333,14 @@ public void shouldRemoveEdgeServersOnGracefulShutdown() throws Throwable renewableTimeoutService, 60_000, 5_000 ); hazelcastClient.start(); - renewableTimeoutService.invokeTimeout( REFRESH_EDGE ); - int numberOfStartedEdgeServers = hazelcastClient.edgeServers().members().size(); + renewableTimeoutService.invokeTimeout( REFRESH_EDGE ); // when hazelcastClient.stop(); // then - assertEquals( 0, numberOfStartedEdgeServers - 1 ); + assertEquals( 0, hazelcastMap.size() ); } private Member makeMember( int id ) throws UnknownHostException diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java index 4bacf61ae6226..37c4480e6e692 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryEdgeClient.java @@ -51,12 +51,6 @@ public void stop() throws Throwable sharedDiscoveryService.unRegisterEdgeMember( addresses ); } - @Override - public EdgeTopology edgeServers() - { - return sharedDiscoveryService.edgeTopology(); - } - @Override public CoreTopology coreServers() { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java index 2debc9f4fcc7e..4b642f69fbacc 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/EdgeServerReplicationIT.java @@ -199,9 +199,6 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreI } catch ( RuntimeException required ) { - - required.printStackTrace(); - // Lifecycle should throw exception, server should not start. assertThat( required.getCause(), instanceOf( LifecycleException.class ) ); assertThat( required.getCause().getCause(), instanceOf( IllegalStateException.class ) );