From bd944012568f49c0f7a029697b7dfe541216a4d9 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Tue, 7 Mar 2017 14:42:24 +0100 Subject: [PATCH] make hazelcast usage more robust and cache results * Use standard job scheduler instead of custom timeout service. * Use robust wrapper for the job scheduler. * Use robust wrapper for hazelcast. * Refresh and cache all the topology information periodically. * Optimize for the hot paths (e.g. address lookup). * Streamline core and replica implementations. --- .../core/CausalClusteringSettings.java | 4 - .../discovery/CoreTopology.java | 11 +- .../discovery/CoreTopologyService.java | 2 - .../discovery/DiscoveryServiceFactory.java | 4 +- .../discovery/HazelcastClient.java | 185 +++++++----------- .../discovery/HazelcastClusterTopology.java | 19 +- .../HazelcastCoreTopologyService.java | 167 ++++++---------- .../HazelcastDiscoveryServiceFactory.java | 7 +- .../discovery/RaftCoreTopologyConnector.java | 2 +- .../discovery/ReadReplicaTopology.java | 2 +- .../discovery/TopologyService.java | 6 +- .../procedures/ClusterOverviewProcedure.java | 4 +- .../identity/ClusterBinder.java | 3 +- .../server_policies/ServerPoliciesPlugin.java | 4 +- .../procedure/GetServersProcedureV1.java | 6 +- .../ConnectToRandomCoreServer.java | 2 +- .../EnterpriseReadReplicaEditionModule.java | 12 +- .../discovery/HazelcastClientTest.java | 124 +++--------- .../discovery/SharedDiscoveryCoreClient.java | 16 +- .../SharedDiscoveryReadReplicaClient.java | 8 +- .../discovery/SharedDiscoveryService.java | 9 +- .../ReadReplicaStartupProcessTest.java | 13 +- ...picallyConnectToRandomReadReplicaTest.java | 10 +- 23 files changed, 231 insertions(+), 389 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 65801aa2d5d95..174e33bb92b32 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -208,10 +208,6 @@ public class CausalClusteringSettings implements LoadableConfig public static final Setting read_replica_time_to_live = setting( "causal_clustering.read_replica_time_to_live", DURATION, "1m", min( 60_000L ) ); - @Description( "Read replica 'call home' frequency" ) - public static final Setting read_replica_refresh_rate = - setting( "causal_clustering.read_replica_refresh_rate", DURATION, "5s", min( 5_000L ) ); - @Description( "How long drivers should cache the data from the `dbms.cluster.routing.getServers()` procedure." ) public static final Setting cluster_routing_ttl = setting( "causal_clustering.cluster_routing_ttl", DURATION, "5m", min( 1_000L ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java index cf2e11c1d1f1d..4d21ef0ddec8a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopology.java @@ -19,8 +19,6 @@ */ package org.neo4j.causalclustering.discovery; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -48,9 +46,9 @@ public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map( coreMembers ); } - public Set members() + public Map members() { - return coreMembers.keySet(); + return coreMembers; } public ClusterId clusterId() @@ -58,11 +56,6 @@ public ClusterId clusterId() return clusterId; } - public Collection allMemberInfo() - { - return coreMembers.values(); - } - public boolean canBeBootstrapped() { return canBeBootstrapped; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java index 942dc60834d60..392fed30f893a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java @@ -39,8 +39,6 @@ public interface CoreTopologyService extends TopologyService */ boolean setClusterId( ClusterId clusterId ); - void refreshCoreTopology(); - interface Listener { void onCoreTopologyChange( CoreTopology coreTopology ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java index 28fca0f4a2a27..8f881ed322d5a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/DiscoveryServiceFactory.java @@ -19,7 +19,6 @@ */ package org.neo4j.causalclustering.discovery; -import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.util.JobScheduler; @@ -31,6 +30,5 @@ CoreTopologyService coreTopologyService( Config config, MemberId myself, JobSche LogProvider logProvider, LogProvider userLogProvider ); TopologyService topologyService( Config config, LogProvider logProvider, - DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, - long readReplicaRefreshRate, MemberId myself ); + JobScheduler jobScheduler, MemberId myself ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index 14874ef531acf..6be58d8fd1e34 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -19,17 +19,17 @@ */ package org.neo4j.causalclustering.discovery; -import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.core.HazelcastInstanceNotActiveException; - +import java.util.HashMap; import java.util.List; -import java.util.function.Function; +import java.util.Map; +import java.util.Optional; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; +import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -38,159 +38,124 @@ import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags; class HazelcastClient extends LifecycleAdapter implements TopologyService { - static final RenewableTimeoutService.TimeoutName REFRESH_READ_REPLICA = () -> "Refresh Read Replica"; private final Log log; private final ClientConnectorAddresses connectorAddresses; - private final HazelcastConnector connector; + private final RobustHazelcastWrapper hzInstance; + private final RobustJobSchedulerWrapper scheduler; private final Config config; - private final RenewableTimeoutService renewableTimeoutService; private final AdvertisedSocketAddress transactionSource; private final List tags; - private HazelcastInstance hazelcastInstance; - private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer; - private final long readReplicaTimeToLiveTimeout; - private final long readReplicaRefreshRate; - private MemberId myself; - - HazelcastClient( HazelcastConnector connector, LogProvider logProvider, Config config, - RenewableTimeoutService renewableTimeoutService, long readReplicaTimeToLiveTimeout, - long readReplicaRefreshRate, MemberId myself ) + private final long timeToLive; + private final long refreshPeriod; + private final MemberId myself; + + private JobScheduler.JobHandle keepAliveJob; + private JobScheduler.JobHandle refreshTopologyJob; + + private volatile Map catchupAddressMap = new HashMap<>(); + private volatile CoreTopology coreTopology = CoreTopology.EMPTY; + private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY; + + HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, MemberId myself ) { - this.connector = connector; + this.hzInstance = new RobustHazelcastWrapper( connector ); this.config = config; - this.renewableTimeoutService = renewableTimeoutService; - this.readReplicaRefreshRate = readReplicaRefreshRate; this.log = logProvider.getLog( getClass() ); + this.scheduler = new RobustJobSchedulerWrapper( scheduler, log ); this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config ); this.transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address ); this.tags = config.get( CausalClusteringSettings.server_tags ); - this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout; + this.timeToLive = config.get( CausalClusteringSettings.read_replica_time_to_live ); + this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ); this.myself = myself; } @Override public CoreTopology coreServers() { - try - { - return retry( ( hazelcastInstance ) -> getCoreTopology( hazelcastInstance, config, log ) ); - } - catch ( Exception e ) - { - log.info( - "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " + - "Connection will be reattempted on next polling attempt.", e ); - return CoreTopology.EMPTY; - } + return coreTopology; } @Override public ReadReplicaTopology readReplicas() { - try - { - return retry( ( hazelcastInstance ) -> getReadReplicaTopology( hazelcastInstance, log ) ); - } - catch ( Exception e ) - { - log.info( - "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " + - "Connection will be reattempted on next polling attempt.", e ); - return ReadReplicaTopology.EMPTY; - } + return rrTopology; } @Override - public ClusterTopology allServers() + public Optional findCatchupAddress( MemberId memberId ) { - return new ClusterTopology( coreServers(), readReplicas() ); + return Optional.ofNullable( catchupAddressMap.get( memberId ) ); + } + + /** + * Caches the topology so that the lookups are fast. + */ + private void refreshTopology() throws HazelcastInstanceNotActiveException + { + coreTopology = hzInstance.apply( ( hz ) -> getCoreTopology( hz, config, log ) ); + rrTopology = hzInstance.apply( ( hz ) -> getReadReplicaTopology( hz, log ) ); + catchupAddressMap = extractCatchupAddressesMap( coreTopology, rrTopology ); } @Override public void start() throws Throwable { - readReplicaRefreshTimer = - renewableTimeoutService.create( REFRESH_READ_REPLICA, readReplicaRefreshRate, 0, timeout -> - { - timeout.renew(); - retry( this::addReadReplica ); - } ); + keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive ); + refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, this::refreshTopology ); } - private Void addReadReplica( HazelcastInstance hazelcastInstance ) + @Override + public void stop() throws Throwable { - String uuid = hazelcastInstance.getLocalEndpoint().getUuid(); - String addresses = connectorAddresses.toString(); - - log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses ); - - hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ) - .put( uuid, transactionSource.toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); - - hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) - .put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS ); - - refreshTags( hazelcastInstance, uuid, tags ); - - // this needs to be last as when we read from it in HazelcastClusterTopology.readReplicas - // we assume that all the other maps have been populated if an entry exists in this one - hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ) - .put( uuid, addresses, readReplicaTimeToLiveTimeout, MILLISECONDS ); - - return null; // return value not used. + scheduler.cancelAndWaitTermination( keepAliveJob ); + scheduler.cancelAndWaitTermination( refreshTopologyJob ); + disconnectFromCore(); } - @Override - public synchronized void stop() throws Throwable + private void disconnectFromCore() { - if ( hazelcastInstance != null ) + try { - try - { - String uuid = hazelcastInstance.getLocalEndpoint().getUuid(); - hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).remove( uuid ); - hazelcastInstance.shutdown(); - } - catch ( Throwable t ) - { - // Hazelcast is not able to stop correctly sometimes and throws a bunch of different exceptions - // let's simply log the current problem but go on with our shutdown - log.warn( "Unable to shutdown Hazelcast", t ); - } + String uuid = hzInstance.apply( hzInstance -> hzInstance.getLocalEndpoint().getUuid() ); + hzInstance.apply( hz -> hz.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).remove( uuid ) ); + hzInstance.shutdown(); + } + catch ( Throwable e ) + { + // Hazelcast is not able to stop correctly sometimes and throws a bunch of different exceptions + // let's simply log the current problem but go on with our shutdown + log.warn( "Unable to shutdown hazelcast cleanly", e ); } - - readReplicaRefreshTimer.cancel(); } - private synchronized T retry( Function hazelcastOperation ) + private void keepReadReplicaAlive() throws HazelcastInstanceNotActiveException { - boolean attemptedConnection = false; - HazelcastInstanceNotActiveException exception = null; - - while ( !attemptedConnection ) + hzInstance.perform( hazelcastInstance -> { - if ( hazelcastInstance == null ) - { - attemptedConnection = true; - hazelcastInstance = connector.connectToHazelcast(); - } - - try - { - return hazelcastOperation.apply( hazelcastInstance ); - } - catch ( HazelcastInstanceNotActiveException e ) - { - hazelcastInstance = null; - exception = e; - } - } - throw exception; + String uuid = hazelcastInstance.getLocalEndpoint().getUuid(); + String addresses = connectorAddresses.toString(); + log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses ); + + hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME ) + .put( uuid, transactionSource.toString(), timeToLive, MILLISECONDS ); + + hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME ) + .put( uuid, myself.getUuid().toString(), timeToLive, MILLISECONDS ); + + refreshTags( hazelcastInstance, uuid, tags ); + + // this needs to be last as when we read from it in HazelcastClusterTopology.readReplicas + // we assume that all the other maps have been populated if an entry exists in this one + hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ) + .put( uuid, addresses, timeToLive, MILLISECONDS ); + } ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java index a65ec87e129ce..c2618716f38b2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java @@ -45,7 +45,7 @@ import static org.neo4j.helpers.SocketAddressFormat.socketAddress; import static org.neo4j.helpers.collection.Iterables.asSet; -class HazelcastClusterTopology +public class HazelcastClusterTopology { // per server attributes private static final String DISCOVERY_SERVER = "discovery_server"; // not currently used @@ -102,6 +102,23 @@ static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Config return new CoreTopology( clusterId, canBeBootstrapped, coreMembers ); } + public static Map extractCatchupAddressesMap( CoreTopology coreTopology, ReadReplicaTopology rrTopology ) + { + Map catchupAddressMap = new HashMap<>(); + + for ( Map.Entry entry : coreTopology.members().entrySet() ) + { + catchupAddressMap.put( entry.getKey(), entry.getValue().getCatchupServer() ); + } + + for ( Map.Entry entry : rrTopology.members().entrySet() ) + { + catchupAddressMap.put( entry.getKey(), entry.getValue().getCatchupServer() ); + } + + return catchupAddressMap; + } + private static ClusterId getClusterId( HazelcastInstance hazelcastInstance ) { IAtomicReference uuidReference = hazelcastInstance.getAtomicReference( CLUSTER_UUID ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index e490701312333..119e6e43d701d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -30,12 +30,15 @@ import com.hazelcast.core.MemberAttributeEvent; import com.hazelcast.core.MembershipEvent; import com.hazelcast.core.MembershipListener; -import com.hazelcast.core.MultiMap; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.graphdb.config.Setting; @@ -53,10 +56,10 @@ import static com.hazelcast.spi.properties.GroupProperty.MERGE_NEXT_RUN_DELAY_SECONDS; import static com.hazelcast.spi.properties.GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS; import static com.hazelcast.spi.properties.GroupProperty.WAIT_SECONDS_BEFORE_JOIN; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags; -import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService { @@ -66,31 +69,34 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol private final Log log; private final Log userLog; private final CoreTopologyListenerService listenerService; - private final JobScheduler scheduler; - private String membershipRegistrationId; + private final RobustJobSchedulerWrapper scheduler; + private final long refreshPeriod; - private JobScheduler.JobHandle jobHandle; + private String membershipRegistrationId; + private JobScheduler.JobHandle refreshJob; private HazelcastInstance hazelcastInstance; - private volatile ReadReplicaTopology latestReadReplicaTopology; - private volatile CoreTopology latestCoreTopology; + private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY; + private volatile CoreTopology coreTopology = CoreTopology.EMPTY; + private volatile Map catchupAddressMap = new HashMap<>(); HazelcastCoreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider, LogProvider userLogProvider ) { this.config = config; this.myself = myself; - this.scheduler = jobScheduler; this.listenerService = new CoreTopologyListenerService(); this.log = logProvider.getLog( getClass() ); + this.scheduler = new RobustJobSchedulerWrapper( jobScheduler, log ); this.userLog = userLogProvider.getLog( getClass() ); + this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ); } @Override public void addCoreTopologyListener( Listener listener ) { listenerService.addCoreTopologyListener( listener ); - listener.onCoreTopologyChange( coreServers() ); + listener.onCoreTopologyChange( coreTopology ); } @Override @@ -102,27 +108,10 @@ public boolean setClusterId( ClusterId clusterId ) @Override public void start() throws Throwable { - hazelcastInstance = createHazelcastInstance(); log.info( "Cluster discovery service started" ); + hazelcastInstance = createHazelcastInstance(); membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( new OurMembershipListener() ); - refreshCoreTopology(); - refreshReadReplicaTopology(); - listenerService.notifyListeners( coreServers() ); - - scheduler.start(); - JobScheduler.Group group = new JobScheduler.Group( "TopologyRefresh", POOLED ); - jobHandle = scheduler.scheduleRecurring( group, () -> - { - try - { - refreshCoreTopology(); - refreshReadReplicaTopology(); - } - catch ( Throwable e ) - { - log.info( "Failed to refresh topology", e ); - } - }, config.get( CausalClusteringSettings.cluster_topology_refresh ), MILLISECONDS ); + refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, this::refreshTopology ); } @Override @@ -130,6 +119,9 @@ public void stop() { log.info( String.format( "HazelcastCoreTopologyService stopping and unbinding from %s", config.get( CausalClusteringSettings.discovery_listen_address ) ) ); + + scheduler.cancelAndWaitTermination( refreshJob ); + try { hazelcastInstance.getCluster().removeMembershipListener( membershipRegistrationId ); @@ -139,10 +131,6 @@ public void stop() { log.warn( "Failed to stop Hazelcast", e ); } - finally - { - jobHandle.cancel( true ); - } } private HazelcastInstance createHazelcastInstance() @@ -187,18 +175,15 @@ private HazelcastInstance createHazelcastInstance() c.setMemberAttributeConfig( memberAttributeConfig ); logConnectionInfo( initialMembers ); - DelayedLog delayedLog = new DelayedLog( "The server has not been able to connect in a timely fashion to the " + - "cluster. Please consult the logs for more details. Rebooting the server may solve the problem", log ); - JobScheduler.JobHandle jobHandle = scheduler - .schedule( new JobScheduler.Group( getClass().toString(), JobScheduler.SchedulingStrategy.POOLED ), - delayedLog, HAZELCAST_IS_HEALTHY_TIMEOUT_MS, MILLISECONDS ); - - delayedLog.setJobHandle( jobHandle ); + JobScheduler.JobHandle logJob = scheduler.schedule( "HazelcastHealth", HAZELCAST_IS_HEALTHY_TIMEOUT_MS, + () -> log.warn( "The server has not been able to connect in a timely fashion to the " + + "cluster. Please consult the logs for more details. Rebooting the server may " + + "solve the problem." ) ); try { hazelcastInstance = Hazelcast.newHazelcastInstance( c ); - delayedLog.stop(); + logJob.cancel( true ); } catch ( HazelcastException e ) { @@ -217,12 +202,12 @@ private HazelcastInstance createHazelcastInstance() private void logConnectionInfo( List initialMembers ) { - userLog.info( "My connection info: " + - "[\n\tDiscovery: listen=%s, advertised=%s," + - "\n\tTransaction: listen=%s, advertised=%s, " + - "\n\tRaft: listen=%s, advertised=%s, " + - "\n\tClient Connector Addresses: %s" + - "\n]", + userLog.info( "My connection info: " + + "[\n\tDiscovery: listen=%s, advertised=%s," + + "\n\tTransaction: listen=%s, advertised=%s, " + + "\n\tRaft: listen=%s, advertised=%s, " + + "\n\tClient Connector Addresses: %s" + + "\n]", config.get( CausalClusteringSettings.discovery_listen_address ), config.get( CausalClusteringSettings.discovery_advertised_address ), config.get( CausalClusteringSettings.transaction_listen_address ), @@ -240,50 +225,48 @@ private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize( } @Override - public ReadReplicaTopology readReplicas() + public CoreTopology coreServers() { - return latestReadReplicaTopology; + return coreTopology; } @Override - public ClusterTopology allServers() + public ReadReplicaTopology readReplicas() { - return new ClusterTopology( coreServers(), readReplicas() ); + return readReplicaTopology; } @Override - public CoreTopology coreServers() + public Optional findCatchupAddress( MemberId memberId ) { - return latestCoreTopology; + return Optional.ofNullable( catchupAddressMap.get( memberId ) ); } - @Override - public void refreshCoreTopology() + private synchronized void refreshTopology() { - CoreTopology newCoreTopology = HazelcastClusterTopology.getCoreTopology( hazelcastInstance, config, log ); + refreshCoreTopology(); + refreshReadReplicaTopology(); + catchupAddressMap = extractCatchupAddressesMap( coreTopology, readReplicaTopology ); + } - if ( coreServers() != null ) - { - CoreTopology.TopologyDifference difference = coreServers().difference( newCoreTopology ); - if ( difference.hasChanges() ) - { - log.info( "Core topology changed %s", difference ); - } - } - else + private void refreshCoreTopology() + { + CoreTopology newCoreTopology = getCoreTopology( hazelcastInstance, config, log ); + + CoreTopology.TopologyDifference difference = coreTopology.difference( newCoreTopology ); + if ( difference.hasChanges() ) { - log.info( "Initial Core topology %s", newCoreTopology ); + log.info( "Core topology changed %s", difference ); } - latestCoreTopology = newCoreTopology; - - listenerService.notifyListeners( coreServers() ); + coreTopology = newCoreTopology; + listenerService.notifyListeners( coreTopology ); } private void refreshReadReplicaTopology() { - latestReadReplicaTopology = HazelcastClusterTopology.getReadReplicaTopology( hazelcastInstance, log ); - log.info( "Current read replica topology is %s", latestReadReplicaTopology ); + readReplicaTopology = getReadReplicaTopology( hazelcastInstance, log ); + log.info( "Current read replica topology is %s", readReplicaTopology ); } private class OurMembershipListener implements MembershipListener @@ -292,14 +275,14 @@ private class OurMembershipListener implements MembershipListener public void memberAdded( MembershipEvent membershipEvent ) { log.info( "Core member added %s", membershipEvent ); - refreshCoreTopology(); + refreshTopology(); } @Override public void memberRemoved( MembershipEvent membershipEvent ) { log.info( "Core member removed %s", membershipEvent ); - refreshCoreTopology(); + refreshTopology(); } @Override @@ -307,40 +290,4 @@ public void memberAttributeChanged( MemberAttributeEvent memberAttributeEvent ) { } } - - private class DelayedLog implements Runnable - { - private final String message; - private final Log log; - private boolean performLogging = true; - private JobScheduler.JobHandle jobHandle; - - DelayedLog( String message, Log log ) - { - this.message = message; - this.log = log; - } - - @Override - public void run() - { - if ( performLogging ) - { - log.warn( message ); - stop(); - } - - jobHandle.cancel( true ); - } - - public void stop() - { - this.performLogging = false; - } - - void setJobHandle( JobScheduler.JobHandle jobHandle ) - { - this.jobHandle = jobHandle; - } - } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java index 02f3918d85736..4e7541b68b04c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastDiscoveryServiceFactory.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.discovery; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.util.JobScheduler; @@ -38,13 +37,11 @@ public CoreTopologyService coreTopologyService( Config config, MemberId myself, @Override public TopologyService topologyService( Config config, LogProvider logProvider, - DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, - long readReplicaRefreshRate, MemberId myself ) + JobScheduler jobScheduler, MemberId myself ) { configureHazelcast( config ); - return new HazelcastClient( new HazelcastClientConnector( config ), logProvider, config, timeoutService, - readReplicaTimeToLiveTimeout, readReplicaRefreshRate, myself ); + return new HazelcastClient( new HazelcastClientConnector( config ), jobScheduler, logProvider, config, myself ); } private static void configureHazelcast( Config config ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java index c8e3d919c1b8a..3c303a0d16c88 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java @@ -49,7 +49,7 @@ public void start() throws BootstrapException @Override public synchronized void onCoreTopologyChange( CoreTopology coreTopology ) { - Set targetMembers = coreTopology.members(); + Set targetMembers = coreTopology.members().keySet(); raftMachine.setTargetMembershipSet( targetMembers ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java index 602e1bc83af3f..14e568bf6f712 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/ReadReplicaTopology.java @@ -44,7 +44,7 @@ public Collection allMemberInfo() return readReplicaMembers.values(); } - public Map replicaMembers() + public Map members() { return readReplicaMembers; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java index 97857aa640a03..5ab4d6f3c911f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyService.java @@ -19,6 +19,10 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.Optional; + +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.lifecycle.Lifecycle; /** @@ -30,5 +34,5 @@ public interface TopologyService extends Lifecycle ReadReplicaTopology readReplicas(); - ClusterTopology allServers(); + Optional findCatchupAddress( MemberId upstream ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java index b60d72135db8b..6041feda3d44c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/procedures/ClusterOverviewProcedure.java @@ -81,7 +81,7 @@ public RawIterator apply( Context ctx, Object[] inp { List endpoints = new ArrayList<>(); CoreTopology coreTopology = topologyService.coreServers(); - Set coreMembers = coreTopology.members(); + Set coreMembers = coreTopology.members().keySet(); MemberId leader = null; try @@ -107,7 +107,7 @@ public RawIterator apply( Context ctx, Object[] inp } } - for ( Map.Entry readReplica : topologyService.readReplicas().replicaMembers().entrySet() ) + for ( Map.Entry readReplica : topologyService.readReplicas().members().entrySet() ) { ReadReplicaInfo readReplicaInfo = readReplica.getValue(); endpoints.add( new ReadWriteEndPoint( readReplicaInfo.connectors(), Role.READ_REPLICA, readReplica.getKey().getUuid(), asList( readReplicaInfo.tags() ) ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterBinder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterBinder.java index e1a5da08e8f52..afd176e475c3e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterBinder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterBinder.java @@ -84,7 +84,7 @@ public void bindToCluster( ThrowingConsumer snapshotIns if ( topology.canBeBootstrapped() ) { commonClusterId = new ClusterId( UUID.randomUUID() ); - CoreSnapshot snapshot = coreBootstrapper.bootstrap( topology.members() ); + CoreSnapshot snapshot = coreBootstrapper.bootstrap( topology.members().keySet() ); log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, commonClusterId ) ); snapshotInstaller.accept( snapshot ); @@ -100,7 +100,6 @@ public void bindToCluster( ThrowingConsumer snapshotIns if ( clock.millis() < endTime ) { retryWaiter.apply(); - topologyService.refreshCoreTopology(); topology = topologyService.coreServers(); } else diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java index 11245ac76f765..4fd2e48ad72aa 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/plugins/server_policies/ServerPoliciesPlugin.java @@ -105,7 +105,7 @@ public Result run( Map context ) private List routeEndpoints( CoreTopology cores ) { - return cores.allMemberInfo().stream().map( extractBoltAddress() ) + return cores.members().values().stream().map( extractBoltAddress() ) .map( Endpoint::route ).collect( Collectors.toList() ); } @@ -136,7 +136,7 @@ private List readEndpoints( CoreTopology coreTopology, ReadReplicaTopo if ( allowReadsOnFollowers || possibleReaders.size() == 0 ) { - Set validCores = coreTopology.members(); + Set validCores = coreTopology.members().keySet(); try { MemberId leader = leaderLocator.getLeader(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java index 11fb771cf4524..cee1046bc4518 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/load_balancing/procedure/GetServersProcedureV1.java @@ -118,7 +118,7 @@ private Optional leaderBoltAddress() private List routeEndpoints() { Stream routers = topologyService.coreServers() - .allMemberInfo().stream().map( extractBoltAddress() ); + .members().values().stream().map( extractBoltAddress() ); List routeEndpoints = routers.map( Endpoint::route ).collect( toList() ); Collections.shuffle( routeEndpoints ); return routeEndpoints; @@ -144,9 +144,9 @@ private List readEndpoints() private Stream coreReadEndPoints() { Optional leader = leaderBoltAddress(); - Collection coreServerInfo = topologyService.coreServers().allMemberInfo(); + Collection coreServerInfo = topologyService.coreServers().members().values(); Stream boltAddresses = topologyService.coreServers() - .allMemberInfo().stream().map( extractBoltAddress() ); + .members().values().stream().map( extractBoltAddress() ); // if the leader is present and it is not alone filter it out from the read end points if ( leader.isPresent() && coreServerInfo.size() > 1 ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java index 5f94413df9381..4b17495f3344f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ConnectToRandomCoreServer.java @@ -49,7 +49,7 @@ public Optional upstreamDatabase() throws UpstreamDatabaseSelectionExc int skippedServers = random.nextInt( coreTopology.members().size() ); - final Iterator iterator = coreTopology.members().iterator(); + final Iterator iterator = coreTopology.members().keySet().iterator(); MemberId member; do diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 7f7ae2d48ca47..2de4e74fc1b3f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -172,17 +172,11 @@ public class EnterpriseReadReplicaEditionModule extends EditionModule LogProvider logProvider = platformModule.logging.getInternalLogProvider(); - DelayedRenewableTimeoutService refreshReadReplicaTimeoutService = - life.add( new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ) ); - - long readReplicaTimeToLiveTimeout = config.get( CausalClusteringSettings.read_replica_time_to_live ); - long readReplicaRefreshRate = config.get( CausalClusteringSettings.read_replica_refresh_rate ); - logProvider.getLog( getClass() ).info( String.format( "Generated new id: %s", myself ) ); - TopologyService topologyService = discoveryServiceFactory - .topologyService( config, logProvider, refreshReadReplicaTimeoutService, - readReplicaTimeToLiveTimeout, readReplicaRefreshRate, myself ); + TopologyService topologyService = discoveryServiceFactory.topologyService( config, + logProvider, platformModule.jobScheduler, myself ); + life.add( dependencies.satisfyDependency( topologyService ) ); long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java index 0e6712262b12b..9decf9db7d367 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClientTest.java @@ -27,7 +27,6 @@ import com.hazelcast.core.EntryView; import com.hazelcast.core.ExecutionCallback; import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.ICompletableFuture; import com.hazelcast.core.IExecutorService; @@ -72,25 +71,23 @@ import java.util.function.BiFunction; import java.util.function.Function; -import org.neo4j.causalclustering.core.consensus.schedule.ControlledRenewableTimeoutService; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.BoltConnector; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; +import org.neo4j.test.OnDemandJobScheduler; import static java.lang.String.format; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.startsWith; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.neo4j.causalclustering.discovery.HazelcastClient.REFRESH_READ_REPLICA; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.CLIENT_CONNECTOR_ADDRESSES; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.MEMBER_UUID; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.RAFT_SERVER; @@ -118,12 +115,12 @@ private Config config() } @Test - public void shouldReturnTopologyUsingHazelcastMembers() throws Exception + public void shouldReturnTopologyUsingHazelcastMembers() throws Throwable { // given HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); + OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); + HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -140,6 +137,8 @@ public void shouldReturnTopologyUsingHazelcastMembers() throws Exception when( cluster.getMembers() ).thenReturn( members ); // when + client.start(); + jobScheduler.runJob(); CoreTopology topology = client.coreServers(); // then @@ -147,12 +146,12 @@ public void shouldReturnTopologyUsingHazelcastMembers() throws Exception } @Test - public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Exception + public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Throwable { // given HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); + OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); + HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); @@ -169,6 +168,9 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Exception when( cluster.getMembers() ).thenReturn( members ); // when + client.start(); + jobScheduler.runJob(); + CoreTopology topology; for ( int i = 0; i < 5; i++ ) { @@ -181,7 +183,7 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable() throws Exception } @Test - public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Exception + public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Throwable { // given HazelcastConnector connector = mock( HazelcastConnector.class ); @@ -195,8 +197,8 @@ public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Excep when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); when( hazelcastInstance.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); - HazelcastClient client = new HazelcastClient( connector, logProvider, config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); + OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); + HazelcastClient client = new HazelcastClient( connector, jobScheduler, logProvider, config(), myself ); com.hazelcast.core.Cluster cluster = mock( Cluster.class ); when( hazelcastInstance.getCluster() ).thenReturn( cluster ); @@ -205,78 +207,11 @@ public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast() throws Excep when( cluster.getMembers() ).thenReturn( members ); // when + client.start(); + jobScheduler.runJob(); CoreTopology topology = client.coreServers(); assertEquals( 0, topology.members().size() ); - verify( log ).info( startsWith( "Failed to read cluster topology from Hazelcast." ), - any( IllegalStateException.class ) ); - } - - @Test - public void shouldReturnEmptyTopologyIfInitiallyConnectedToHazelcastButItsNowUnavailable() throws Exception - { - // given - HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); - - HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class ); - when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); - - when( hazelcastInstance.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); - - when( hazelcastInstance.getCluster() ).thenThrow( new HazelcastInstanceNotActiveException() ); - - // when - CoreTopology topology = client.coreServers(); - - // then - assertEquals( 0, topology.members().size() ); - } - - @Test - public void shouldReconnectIfHazelcastUnavailable() throws Exception - { - // given - HazelcastConnector connector = mock( HazelcastConnector.class ); - HazelcastClient client = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), new - ControlledRenewableTimeoutService(), 60_000, 5_000, myself ); - - HazelcastInstance hazelcastInstance1 = mock( HazelcastInstance.class ); - HazelcastInstance hazelcastInstance2 = mock( HazelcastInstance.class ); - when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance1 ) - .thenReturn( hazelcastInstance2 ); - - com.hazelcast.core.Cluster cluster = mock( Cluster.class ); - when( hazelcastInstance1.getCluster() ).thenReturn( cluster ).thenReturn( cluster ) - .thenThrow( new HazelcastInstanceNotActiveException() ); - when( hazelcastInstance2.getCluster() ).thenReturn( cluster ); - - when( hazelcastInstance1.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); - when( hazelcastInstance1.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); - when( hazelcastInstance1.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); - when( hazelcastInstance2.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) ); - when( hazelcastInstance2.getSet( anyString() ) ).thenReturn( new HazelcastSet() ); - when( hazelcastInstance2.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() ); - - when( hazelcastInstance1.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); - when( hazelcastInstance2.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() ); - - Set members = asSet( makeMember( 1 ), makeMember( 2 ) ); - when( cluster.getMembers() ).thenReturn( members ); - - // when - CoreTopology topology1 = client.coreServers(); - - // then - assertEquals( members.size(), topology1.members().size() ); - - // when - CoreTopology topology2 = client.coreServers(); - - // then - assertEquals( members.size(), topology2.members().size() ); - verify( connector, times( 2 ) ).connectToHazelcast(); } @Test @@ -312,14 +247,12 @@ public void shouldRegisterReadReplicaInTopology() throws Throwable HazelcastConnector connector = mock( HazelcastConnector.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); - ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), - renewableTimeoutService, 60_000, 5_000, myself ); - - hazelcastClient.start(); + OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); + HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); // when - renewableTimeoutService.invokeTimeout( REFRESH_READ_REPLICA ); + hazelcastClient.start(); + jobScheduler.runJob(); // then assertEquals( 1, hazelcastMap.size() ); @@ -357,13 +290,12 @@ public void shouldRemoveReadReplicasOnGracefulShutdown() throws Throwable HazelcastConnector connector = mock( HazelcastConnector.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); - ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); - HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), - renewableTimeoutService, 60_000, 5_000, myself ); + OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); + HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); hazelcastClient.start(); - renewableTimeoutService.invokeTimeout( REFRESH_READ_REPLICA ); + jobScheduler.runJob(); // when hazelcastClient.stop(); @@ -388,14 +320,12 @@ public void shouldSwallowNPEFromHazelcast() throws Throwable HazelcastConnector connector = mock( HazelcastConnector.class ); when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance ); - ControlledRenewableTimeoutService renewableTimeoutService = new ControlledRenewableTimeoutService(); - - HazelcastClient hazelcastClient = new HazelcastClient( connector, NullLogProvider.getInstance(), config(), - renewableTimeoutService, 60_000, 5_000, myself ); + OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); + HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself ); hazelcastClient.start(); - renewableTimeoutService.invokeTimeout( REFRESH_READ_REPLICA ); + jobScheduler.runJob(); // when hazelcastClient.stop(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java index 14fc786ba2b77..2c0dc12b2bd59 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java @@ -20,12 +20,13 @@ package org.neo4j.causalclustering.discovery; import java.util.LinkedHashSet; +import java.util.Optional; import java.util.Set; -import org.neo4j.causalclustering.identity.ClusterId; -import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; @@ -63,12 +64,6 @@ public boolean setClusterId( ClusterId clusterId ) return sharedDiscoveryService.casClusterId( clusterId ); } - @Override - public void refreshCoreTopology() - { - // do nothing - } - @Override public void start() throws InterruptedException { @@ -92,9 +87,10 @@ public ReadReplicaTopology readReplicas() } @Override - public ClusterTopology allServers() + public Optional findCatchupAddress( MemberId upstream ) { - return new ClusterTopology( coreServers(), readReplicas() ); + return coreTopology.find( upstream ).map( info -> Optional.of( info.getCatchupServer() ) ) + .orElseGet( () -> readReplicaTopology.find( upstream ).map( ReadReplicaInfo::getCatchupServer ) ); } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java index 7487741f416cb..fc89f1d96c90e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryReadReplicaClient.java @@ -19,6 +19,8 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.Optional; + import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -76,8 +78,10 @@ public ReadReplicaTopology readReplicas() } @Override - public ClusterTopology allServers() + public Optional findCatchupAddress( MemberId upstream ) { - return new ClusterTopology( coreServers(), readReplicas() ); + return sharedDiscoveryService.coreTopology( null ).find( upstream ).map( info -> Optional.of( info.getCatchupServer() ) ) + .orElseGet( () -> sharedDiscoveryService.readReplicaTopology().find( upstream ).map( ReadReplicaInfo::getCatchupServer ) ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index 2d3f0882cd602..e4ef092911f1a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -28,7 +28,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; @@ -59,9 +58,7 @@ public CoreTopologyService coreTopologyService( Config config, MemberId myself, } @Override - public TopologyService topologyService( Config config, LogProvider logProvider, - DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, - long readReplicaRefreshRate, MemberId myself ) + public TopologyService topologyService( Config config, LogProvider logProvider, JobScheduler jobScheduler, MemberId myself ) { return new SharedDiscoveryReadReplicaClient( this, config, myself, logProvider ); } @@ -154,7 +151,7 @@ void registerReadReplica( MemberId memberId, ReadReplicaInfo readReplicaInfo ) lock.lock(); try { - this.readReplicaInfoMap.put( memberId, readReplicaInfo ); + readReplicaInfoMap.put( memberId, readReplicaInfo ); notifyCoreClients(); } finally @@ -168,7 +165,7 @@ void unRegisterReadReplica( MemberId memberId ) lock.lock(); try { - ReadReplicaInfo removed = this.readReplicaInfoMap.remove( memberId ); + readReplicaInfoMap.remove( memberId ); notifyCoreClients(); } finally diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java index 8fdf785e6b5ec..27eee79d15e85 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/ReadReplicaStartupProcessTest.java @@ -23,6 +23,8 @@ import org.junit.Test; import java.io.File; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.io.IOException; import java.util.UUID; @@ -32,13 +34,13 @@ import org.neo4j.causalclustering.catchup.storecopy.RemoteStore; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess; import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException; +import org.neo4j.causalclustering.discovery.CoreServerInfo; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.helpers.Service; -import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.NullLogProvider; @@ -52,14 +54,12 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.neo4j.helpers.collection.Iterators.asSet; public class ReadReplicaStartupProcessTest { private ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, MILLISECONDS ); private StoreCopyProcess storeCopyProcess = mock( StoreCopyProcess.class ); private RemoteStore remoteStore = mock( RemoteStore.class ); - private FileSystemAbstraction fs = mock( FileSystemAbstraction.class ); private final PageCache pageCache = mock( PageCache.class ); private LocalDatabase localDatabase = mock( LocalDatabase.class ); private TopologyService topologyService = mock( TopologyService.class ); @@ -74,11 +74,14 @@ public class ReadReplicaStartupProcessTest @Before public void commonMocking() throws StoreIdDownloadFailedException, IOException { + Map members = new HashMap<>(); + members.put( memberId, mock( CoreServerInfo.class ) ); + when( pageCache.streamFilesRecursive( any(File.class) ) ).thenAnswer( ( f ) -> Stream.empty() ); when( localDatabase.storeDir() ).thenReturn( storeDir ); when( localDatabase.storeId() ).thenReturn( localStoreId ); when( topologyService.coreServers() ).thenReturn( clusterTopology ); - when( clusterTopology.members() ).thenReturn( asSet( memberId ) ); + when( clusterTopology.members() ).thenReturn( members ); } @Test @@ -190,7 +193,7 @@ public AlwaysChooseFirstMember() public Optional upstreamDatabase() throws UpstreamDatabaseSelectionException { CoreTopology coreTopology = topologyService.coreServers(); - return Optional.ofNullable( coreTopology.members().iterator().next() ); + return Optional.ofNullable( coreTopology.members().keySet().iterator().next() ); } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java index 69523b9e37408..0363683148ddb 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/readreplica/TypicallyConnectToRandomReadReplicaTest.java @@ -25,10 +25,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.neo4j.causalclustering.discovery.ClientConnectorAddresses; -import org.neo4j.causalclustering.discovery.ClusterTopology; import org.neo4j.causalclustering.discovery.CoreTopology; import org.neo4j.causalclustering.discovery.ReadReplicaInfo; import org.neo4j.causalclustering.discovery.ReadReplicaTopology; @@ -39,6 +39,7 @@ import static java.util.Collections.singletonList; import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertThat; +import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap; import static org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServerTest.fakeCoreTopology; public class TypicallyConnectToRandomReadReplicaTest @@ -71,6 +72,9 @@ private TopologyService fakeTopologyService( CoreTopology coreTopology, { return new TopologyService() { + private Map catchupAddresses = + extractCatchupAddressesMap( coreTopology, readReplicaTopology ); + @Override public CoreTopology coreServers() { @@ -84,9 +88,9 @@ public ReadReplicaTopology readReplicas() } @Override - public ClusterTopology allServers() + public Optional findCatchupAddress( MemberId upstream ) { - return new ClusterTopology( coreTopology, readReplicaTopology ); + return Optional.ofNullable( catchupAddresses.get( upstream ) ); } @Override