Skip to content

Commit

Permalink
Fixed MultiRetryStrategyTest
Browse files Browse the repository at this point in the history
  • Loading branch information
phughk committed Feb 15, 2018
1 parent 4eb0933 commit e4c2e96
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 64 deletions.
Expand Up @@ -65,7 +65,7 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member


topologyService = discoveryServiceFactory topologyService = discoveryServiceFactory
.coreTopologyService( config, sslPolicy, myself, platformModule.jobScheduler, logProvider, .coreTopologyService( config, sslPolicy, myself, platformModule.jobScheduler, logProvider,
userLogProvider, hostnameResolver, resolveStrategy( config ) ); userLogProvider, hostnameResolver, resolveStrategy( config, logProvider ) );


life.add( topologyService ); life.add( topologyService );


Expand All @@ -82,13 +82,13 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
() -> sleep( 100 ), 300_000, coreBootstrapper ); () -> sleep( 100 ), 300_000, coreBootstrapper );
} }


private static TopologyServiceRetryStrategy resolveStrategy( Config config ) private static TopologyServiceRetryStrategy resolveStrategy( Config config, LogProvider logProvider )
{ {
long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis();
int pollingFrequencyWithinRefreshWindow = 2; int pollingFrequencyWithinRefreshWindow = 2;
int numberOfRetries = int numberOfRetries =
pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period
return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider );
} }


public CoreTopologyService topologyService() public CoreTopologyService topologyService()
Expand Down
Expand Up @@ -83,15 +83,15 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis();
this.myself = myself; this.myself = myself;
this.groups = config.get( CausalClusteringSettings.server_groups ); this.groups = config.get( CausalClusteringSettings.server_groups );
this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod ); this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod, logProvider );
} }


private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis ) private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis, LogProvider logProvider )
{ {
int pollingFrequencyWithinRefreshWindow = 2; int pollingFrequencyWithinRefreshWindow = 2;
int numberOfRetries = int numberOfRetries =
pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period
return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider );
} }


@Override @Override
Expand Down
Expand Up @@ -19,69 +19,55 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;


import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;


/** /**
* Implementation of the RetryStrategy that repeats the retriable function until the correct result has been retrieved or the limit of retries has been * Implementation of the RetryStrategy that repeats the retriable function until the correct result has been retrieved or the limit of retries has been
* encountered. * encountered.
* There is a fixed delay between each retry. * There is a fixed delay between each retry.
* *
* @param <I> the type of input of the retriable function * @param <INPUT> the type of input of the retriable function
* @param <E> the type of output of the retriable function * @param <OUTPUT> the type of output of the retriable function
*/ */
public class MultiRetryStrategy<I, E> implements RetryStrategy<I,E> public class MultiRetryStrategy<INPUT, OUTPUT> implements RetryStrategy<INPUT,OUTPUT>
{ {
private final long delayInMillis; private final long delayInMillis;
private final long retries; private final long retries;
private final LogProvider logProvider; private final LogProvider logProvider;

private final Consumer<Long> sleeper;
/**
* @param delayInMillis number of milliseconds between each attempt at getting the desired result
* @param retries the number of attempts to perform before giving up
*/
public MultiRetryStrategy( long delayInMillis, long retries )
{
this( delayInMillis, retries, NullLogProvider.getInstance() );
}


/** /**
* @param delayInMillis number of milliseconds between each attempt at getting the desired result * @param delayInMillis number of milliseconds between each attempt at getting the desired result
* @param retries the number of attempts to perform before giving up * @param retries the number of attempts to perform before giving up
* @param logProvider {@see LogProvider} * @param logProvider {@see LogProvider}
*/ */
public MultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider ) public MultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider, Consumer<Long> sleeper )
{ {
this.delayInMillis = delayInMillis; this.delayInMillis = delayInMillis;
this.retries = retries; this.retries = retries;
this.logProvider = logProvider; this.logProvider = logProvider;
this.sleeper = sleeper;
} }


/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public E apply( I input, Function<I,E> retriable, Predicate<E> wasRetrySuccessful ) public OUTPUT apply( INPUT retriableInput, Function<INPUT,OUTPUT> retriable, Predicate<OUTPUT> wasRetrySuccessful )
{ {
Log log = logProvider.getLog( MultiRetryStrategy.class ); Log log = logProvider.getLog( MultiRetryStrategy.class );
E result = retriable.apply( input ); OUTPUT result = retriable.apply( retriableInput );
int currentIteration = 0; int currentIteration = 0;
while ( !wasRetrySuccessful.test( result ) && currentIteration++ < retries ) while ( !wasRetrySuccessful.test( result ) && currentIteration++ < retries )
{ {
log.debug( "Try attempt was unsuccessful for input: %s\n", input ); log.debug( "Try attempt was unsuccessful for input: %s\n", retriableInput );
try sleeper.accept( delayInMillis );
{ result = retriable.apply( retriableInput );
Thread.sleep( delayInMillis );
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
result = retriable.apply( input );
} }
return result; return result;
} }
Expand Down
Expand Up @@ -23,11 +23,22 @@


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.LogProvider;


public class TopologyServiceMultiRetryStrategy extends MultiRetryStrategy<MemberId,Optional<AdvertisedSocketAddress>> implements TopologyServiceRetryStrategy public class TopologyServiceMultiRetryStrategy extends MultiRetryStrategy<MemberId,Optional<AdvertisedSocketAddress>> implements TopologyServiceRetryStrategy
{ {
public TopologyServiceMultiRetryStrategy( long delayInMillis, long retries ) public TopologyServiceMultiRetryStrategy( long delayInMillis, long retries, LogProvider logProvider )
{ {
super( delayInMillis, retries ); super( delayInMillis, retries, logProvider, millis ->
{
try
{
Thread.sleep( millis );
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
} );
} }
} }
Expand Up @@ -192,7 +192,7 @@ public class EnterpriseReadReplicaEditionModule extends EditionModule


TopologyService topologyService = discoveryServiceFactory.topologyService( config, clusterSslPolicy, TopologyService topologyService = discoveryServiceFactory.topologyService( config, clusterSslPolicy,
logProvider, platformModule.jobScheduler, myself, logProvider, platformModule.jobScheduler, myself,
hostnameResolver, resolveStrategy( config ) ); hostnameResolver, resolveStrategy( config, logProvider ) );


life.add( dependencies.satisfyDependency( topologyService ) ); life.add( dependencies.satisfyDependency( topologyService ) );


Expand Down Expand Up @@ -383,12 +383,12 @@ public UpstreamDatabaseSelectionStrategy next()
} }
} }


private static TopologyServiceRetryStrategy resolveStrategy( Config config ) private static TopologyServiceRetryStrategy resolveStrategy( Config config, LogProvider logProvider )
{ {
long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); long refreshPeriodMillis = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis();
int pollingFrequencyWithinRefreshWindow = 2; int pollingFrequencyWithinRefreshWindow = 2;
int numberOfRetries = int numberOfRetries =
pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period pollingFrequencyWithinRefreshWindow + 1; // we want to have more retries at the given frequency than there is time in a refresh period
return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries ); return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider );
} }
} }
Expand Up @@ -19,16 +19,17 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;


import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;


import org.neo4j.logging.NullLogProvider;

import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;


@Ignore
public class MultiRetryStrategyTest public class MultiRetryStrategyTest
{ {
private static final Predicate<Integer> ALWAYS_VALID = i -> true; private static final Predicate<Integer> ALWAYS_VALID = i -> true;
Expand All @@ -52,9 +53,9 @@ public boolean test( Integer integer )
public void successOnRetryCausesNoDelay() public void successOnRetryCausesNoDelay()
{ {
// given // given
int delay = 1000; CountingSleeper countingSleeper = new CountingSleeper();
int retries = 10; int retries = 10;
MultiRetryStrategy<Integer,Integer> subject = new MultiRetryStrategy<>( delay, retries ); MultiRetryStrategy<Integer,Integer> subject = new MultiRetryStrategy<>( 0, retries, NullLogProvider.getInstance(), countingSleeper );


// when // when
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Expand All @@ -63,49 +64,52 @@ public void successOnRetryCausesNoDelay()


// then // then
long duration = endTime - startTime; long duration = endTime - startTime;
assertTrue( "First execution should not be called after delay", duration < delay ); assertEquals( 0, countingSleeper.invocationCount() );
assertEquals( "Function identity should be used to retrieve the expected value", 3, result.intValue() ); assertEquals( "Function identity should be used to retrieve the expected value", 3, result.intValue() );
} }


@Test @Test
public void numberOfIterationsDoesNotExceedMaximum() public void numberOfIterationsDoesNotExceedMaximum()
{ {
// given // given
int delay = 200; CountingSleeper countingSleeper = new CountingSleeper();
int retries = 5; int retries = 5;
MultiRetryStrategy<Integer,Integer> subject = new MultiRetryStrategy<>( delay, retries ); MultiRetryStrategy<Integer,Integer> subject = new MultiRetryStrategy<>( 0, retries, NullLogProvider.getInstance(), countingSleeper );


// when // when
long startTime = System.currentTimeMillis(); subject.apply( 3, Function.identity(), NEVER_VALID );
Integer result = subject.apply( 3, Function.identity(), NEVER_VALID );
long endTime = System.currentTimeMillis();


// then // then
long duration = endTime - startTime; assertEquals( retries, countingSleeper.invocationCount() );
double durationInSeconds = duration / 1000.0;
double expectedDurationInSeconds = (delay * retries) / 1000.0;
double marginOfErrorInSeconds = (delay / 1000.0) / 2;
assertEquals( expectedDurationInSeconds, durationInSeconds, marginOfErrorInSeconds );
} }


@Test @Test
public void successfulRetriesBreakTheRetryLoop() public void successfulRetriesBreakTheRetryLoop()
{ {
// given CountingSleeper countingSleeper = new CountingSleeper();
int delay = 200; int retries = 5;
int retries = 10; MultiRetryStrategy<Integer,Integer> subject = new MultiRetryStrategy<>( 0, retries, NullLogProvider.getInstance(), countingSleeper );
MultiRetryStrategy<Integer,Integer> subject = new MultiRetryStrategy<>( delay, retries );


// when // when
long startTime = System.currentTimeMillis(); subject.apply( 3, Function.identity(), VALID_ON_SECOND_TIME );
Integer result = subject.apply( 3, Function.identity(), VALID_ON_SECOND_TIME );
long endTime = System.currentTimeMillis();


// then // then
long duration = endTime - startTime; assertEquals( 1, countingSleeper.invocationCount() );
double durationInSeconds = duration / 1000.0; }
double expectedDurationInSeconds = delay / 1000.0;
double marginOfErrorInSeconds = (delay / 1000.0) / 4; private class CountingSleeper implements Consumer<Long>
assertEquals( expectedDurationInSeconds, durationInSeconds, marginOfErrorInSeconds ); {
private int counter;

@Override
public void accept( Long duration )
{
counter++;
}

public int invocationCount()
{
return counter;
}
} }
} }

0 comments on commit e4c2e96

Please sign in to comment.