diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java index 43dafedc3ae24..571287479f437 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/ClusteringModule.java @@ -65,7 +65,7 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member topologyService = discoveryServiceFactory .coreTopologyService( config, sslPolicy, myself, platformModule.jobScheduler, logProvider, - userLogProvider, hostnameResolver, resolveStrategy( config ) ); + userLogProvider, hostnameResolver, resolveStrategy( config, logProvider ) ); life.add( topologyService ); @@ -82,13 +82,13 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member () -> sleep( 100 ), 300_000, coreBootstrapper ); } - private static TopologyServiceRetryStrategy resolveStrategy( Config config ) + private static TopologyServiceRetryStrategy resolveStrategy( Config config, LogProvider logProvider ) { long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); int pollingFrequencyWithinRefreshWindow = 2; int numberOfRetries = pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period - return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); + return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider ); } public CoreTopologyService topologyService() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index bcfcd2902c12d..ae1ec3d82e5ed 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -83,15 +83,15 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); this.myself = myself; this.groups = config.get( CausalClusteringSettings.server_groups ); - this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod ); + this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod, logProvider ); } - private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis ) + private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis, LogProvider logProvider ) { int pollingFrequencyWithinRefreshWindow = 2; int numberOfRetries = pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period - return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); + return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java index 47803a6774bfc..00f93ed4c668d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/MultiRetryStrategy.java @@ -19,69 +19,55 @@ */ package org.neo4j.causalclustering.discovery; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import org.neo4j.logging.NullLogProvider; /** * Implementation of the RetryStrategy that repeats the retriable function until the correct result has been retrieved or the limit of retries has been * encountered. * There is a fixed delay between each retry. * - * @param the type of input of the retriable function - * @param the type of output of the retriable function + * @param the type of input of the retriable function + * @param the type of output of the retriable function */ -public class MultiRetryStrategy implements RetryStrategy +public class MultiRetryStrategy implements RetryStrategy { private final long delayInMillis; private final long retries; private final LogProvider logProvider; - - /** - * @param delayInMillis number of milliseconds between each attempt at getting the desired result - * @param retries the number of attempts to perform before giving up - */ - public MultiRetryStrategy( long delayInMillis, long retries ) - { - this( delayInMillis, retries, NullLogProvider.getInstance() ); - } + private final Consumer sleeper; /** * @param delayInMillis number of milliseconds between each attempt at getting the desired result * @param retries the number of attempts to perform before giving up * @param logProvider {@see LogProvider} */ - public MultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider ) + public MultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider, Consumer sleeper ) { this.delayInMillis = delayInMillis; this.retries = retries; this.logProvider = logProvider; + this.sleeper = sleeper; } /** * {@inheritDoc} */ @Override - public E apply( I input, Function retriable, Predicate wasRetrySuccessful ) + public OUTPUT apply( INPUT retriableInput, Function retriable, Predicate wasRetrySuccessful ) { Log log = logProvider.getLog( MultiRetryStrategy.class ); - E result = retriable.apply( input ); + OUTPUT result = retriable.apply( retriableInput ); int currentIteration = 0; while ( !wasRetrySuccessful.test( result ) && currentIteration++ < retries ) { - log.debug( "Try attempt was unsuccessful for input: %s\n", input ); - try - { - Thread.sleep( delayInMillis ); - } - catch ( InterruptedException e ) - { - throw new RuntimeException( e ); - } - result = retriable.apply( input ); + log.debug( "Try attempt was unsuccessful for input: %s\n", retriableInput ); + sleeper.accept( delayInMillis ); + result = retriable.apply( retriableInput ); } return result; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java index d4601e8a47d83..86d98e32171cb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/TopologyServiceMultiRetryStrategy.java @@ -23,11 +23,22 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.logging.LogProvider; public class TopologyServiceMultiRetryStrategy extends MultiRetryStrategy> implements TopologyServiceRetryStrategy { - public TopologyServiceMultiRetryStrategy( long delayInMillis, long retries ) + public TopologyServiceMultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider ) { - super( delayInMillis, retries ); + super( delayInMillis, retries, logProvider, millis -> + { + try + { + Thread.sleep( millis ); + } + catch ( InterruptedException e ) + { + throw new RuntimeException( e ); + } + } ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 2450551f375ea..5e852b4c503c2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -192,7 +192,7 @@ public class EnterpriseReadReplicaEditionModule extends EditionModule TopologyService topologyService = discoveryServiceFactory.topologyService( config, clusterSslPolicy, logProvider, platformModule.jobScheduler, myself, - hostnameResolver, resolveStrategy( config ) ); + hostnameResolver, resolveStrategy( config, logProvider ) ); life.add( dependencies.satisfyDependency( topologyService ) ); @@ -383,12 +383,12 @@ public UpstreamDatabaseSelectionStrategy next() } } - private static TopologyServiceRetryStrategy resolveStrategy( Config config ) + private static TopologyServiceRetryStrategy resolveStrategy( Config config, LogProvider logProvider ) { long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); int pollingFrequencyWithinRefreshWindow = 2; int numberOfRetries = pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period - return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); + return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java index ed6e3a8f0ecd1..ca2946ef1128b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/MultiRetryStrategyTest.java @@ -19,16 +19,17 @@ */ package org.neo4j.causalclustering.discovery; -import org.junit.Ignore; import org.junit.Test; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import org.neo4j.logging.NullLogProvider; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -@Ignore public class MultiRetryStrategyTest { private static final Predicate ALWAYS_VALID = i -> true; @@ -52,9 +53,9 @@ public boolean test( Integer integer ) public void successOnRetryCausesNoDelay() { // given - int delay = 1000; + CountingSleeper countingSleeper = new CountingSleeper(); int retries = 10; - MultiRetryStrategy subject = new MultiRetryStrategy<>( delay, retries ); + MultiRetryStrategy subject = new MultiRetryStrategy<>( 0, retries, NullLogProvider.getInstance(), countingSleeper ); // when long startTime = System.currentTimeMillis(); @@ -63,7 +64,7 @@ public void successOnRetryCausesNoDelay() // then long duration = endTime - startTime; - assertTrue( "First execution should not be called after delay", duration < delay ); + assertEquals( 0, countingSleeper.invocationCount() ); assertEquals( "Function identity should be used to retrieve the expected value", 3, result.intValue() ); } @@ -71,41 +72,44 @@ public void successOnRetryCausesNoDelay() public void numberOfIterationsDoesNotExceedMaximum() { // given - int delay = 200; + CountingSleeper countingSleeper = new CountingSleeper(); int retries = 5; - MultiRetryStrategy subject = new MultiRetryStrategy<>( delay, retries ); + MultiRetryStrategy subject = new MultiRetryStrategy<>( 0, retries, NullLogProvider.getInstance(), countingSleeper ); // when - long startTime = System.currentTimeMillis(); - Integer result = subject.apply( 3, Function.identity(), NEVER_VALID ); - long endTime = System.currentTimeMillis(); + subject.apply( 3, Function.identity(), NEVER_VALID ); // then - long duration = endTime - startTime; - double durationInSeconds = duration / 1000.0; - double expectedDurationInSeconds = (delay * retries) / 1000.0; - double marginOfErrorInSeconds = (delay / 1000.0) / 2; - assertEquals( expectedDurationInSeconds, durationInSeconds, marginOfErrorInSeconds ); + assertEquals( retries, countingSleeper.invocationCount() ); } @Test public void successfulRetriesBreakTheRetryLoop() { - // given - int delay = 200; - int retries = 10; - MultiRetryStrategy subject = new MultiRetryStrategy<>( delay, retries ); + CountingSleeper countingSleeper = new CountingSleeper(); + int retries = 5; + MultiRetryStrategy subject = new MultiRetryStrategy<>( 0, retries, NullLogProvider.getInstance(), countingSleeper ); // when - long startTime = System.currentTimeMillis(); - Integer result = subject.apply( 3, Function.identity(), VALID_ON_SECOND_TIME ); - long endTime = System.currentTimeMillis(); + subject.apply( 3, Function.identity(), VALID_ON_SECOND_TIME ); // then - long duration = endTime - startTime; - double durationInSeconds = duration / 1000.0; - double expectedDurationInSeconds = delay / 1000.0; - double marginOfErrorInSeconds = (delay / 1000.0) / 4; - assertEquals( expectedDurationInSeconds, durationInSeconds, marginOfErrorInSeconds ); + assertEquals( 1, countingSleeper.invocationCount() ); + } + + private class CountingSleeper implements Consumer + { + private int counter; + + @Override + public void accept( Long duration ) + { + counter++; + } + + public int invocationCount() + { + return counter; + } } }