Skip to content

Commit

Permalink
Renames ScheduledTimeout to ScheduledRenewableTimeout. Adds TimeoutSe…
Browse files Browse the repository at this point in the history
…rvice javadocs.
  • Loading branch information
digitalstain committed Dec 2, 2015
1 parent 5a2af79 commit 60291fa
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 33 deletions.
Expand Up @@ -38,6 +38,14 @@


import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;


/**
* A bare bones, wall clock based implementation of the {@link RenewableTimeoutService}. It uses a scheduled thread
* pool to check for timeouts and as such has a limited resolution of {@link #TIMER_RESOLUTION}, measured in
* {@link #TIMER_RESOLUTION_UNIT}. For the same reason, the timeouts are triggered at an approximate delay rather than
* exactly at the value requested.
* {@link org.neo4j.coreedge.raft.RenewableTimeoutService.TimeoutHandler} are all called from the same thread,
* so users should be aware to not perform time consuming tasks in them.
*/
public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService
{ {
public static final int TIMER_RESOLUTION = 1; public static final int TIMER_RESOLUTION = 1;
Expand All @@ -46,8 +54,8 @@ public class DelayedRenewableTimeoutService extends LifecycleAdapter implements
/** /**
* Sorted by next-to-trigger. * Sorted by next-to-trigger.
*/ */
private final SortedSet<ScheduledTimeout> timeouts = new TreeSet<>(); private final SortedSet<ScheduledRenewableTimeout> timeouts = new TreeSet<>();
private final Queue<ScheduledTimeout> pendingRenewals = new ConcurrentLinkedDeque<>(); private final Queue<ScheduledRenewableTimeout> pendingRenewals = new ConcurrentLinkedDeque<>();
private final Clock clock; private final Clock clock;
private final Random random; private final Random random;
private final JobScheduler scheduler; private final JobScheduler scheduler;
Expand Down Expand Up @@ -75,9 +83,9 @@ public DelayedRenewableTimeoutService( Clock clock )
* If you don't want randomness, set randomRangeInMillis to 0. * If you don't want randomness, set randomRangeInMillis to 0.
*/ */
@Override @Override
public Timeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler ) public RenewableTimeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler )
{ {
ScheduledTimeout timeout = new ScheduledTimeout( ScheduledRenewableTimeout timeout = new ScheduledRenewableTimeout(
calcTimeoutTimestamp( delayInMillis, randomRangeInMillis ), calcTimeoutTimestamp( delayInMillis, randomRangeInMillis ),
delayInMillis, randomRangeInMillis, handler, this ); delayInMillis, randomRangeInMillis, handler, this );


Expand All @@ -89,12 +97,12 @@ public Timeout create( TimeoutName name, long delayInMillis, long randomRangeInM
return timeout; return timeout;
} }


public void renew( ScheduledTimeout timeout ) public void renew( ScheduledRenewableTimeout timeout )
{ {
pendingRenewals.offer( timeout ); pendingRenewals.offer( timeout );
} }


public void cancel( ScheduledTimeout timeout ) public void cancel( ScheduledRenewableTimeout timeout )
{ {
synchronized ( timeouts ) synchronized ( timeouts )
{ {
Expand All @@ -114,12 +122,12 @@ public void run()
try try
{ {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
Collection<ScheduledTimeout> triggered = new LinkedList<>(); Collection<ScheduledRenewableTimeout> triggered = new LinkedList<>();


synchronized ( timeouts ) synchronized ( timeouts )
{ {
// Handle renewals // Handle renewals
ScheduledTimeout renew; ScheduledRenewableTimeout renew;
while ( (renew = pendingRenewals.poll()) != null ) while ( (renew = pendingRenewals.poll()) != null )
{ {
timeouts.remove( renew ); timeouts.remove( renew );
Expand All @@ -128,7 +136,7 @@ public void run()
} }


// Trigger timeouts // Trigger timeouts
for ( ScheduledTimeout timeout : timeouts ) for ( ScheduledRenewableTimeout timeout : timeouts )
{ {
if ( timeout.shouldTrigger( now ) ) if ( timeout.shouldTrigger( now ) )
{ {
Expand Down Expand Up @@ -173,7 +181,7 @@ public void stop() throws Throwable
scheduler.shutdown(); scheduler.shutdown();
} }


public static class ScheduledTimeout implements Timeout, Comparable<ScheduledTimeout> public static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable<ScheduledRenewableTimeout>
{ {
private static final AtomicLong idGen = new AtomicLong(); private static final AtomicLong idGen = new AtomicLong();
private final long id = idGen.getAndIncrement(); private final long id = idGen.getAndIncrement();
Expand All @@ -183,7 +191,7 @@ public static class ScheduledTimeout implements Timeout, Comparable<ScheduledTim
private final DelayedRenewableTimeoutService timeouts; private final DelayedRenewableTimeoutService timeouts;
private long timeoutTimestampMillis; private long timeoutTimestampMillis;


public ScheduledTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler public ScheduledRenewableTimeout( long timeoutTimestampMillis, long timeoutLength, long randomRange, TimeoutHandler
handler, DelayedRenewableTimeoutService timeouts ) handler, DelayedRenewableTimeoutService timeouts )
{ {
this.timeoutTimestampMillis = timeoutTimestampMillis; this.timeoutTimestampMillis = timeoutTimestampMillis;
Expand Down Expand Up @@ -221,7 +229,7 @@ public void setTimeoutTimestamp( long newTimestamp )
} }


@Override @Override
public int compareTo( ScheduledTimeout o ) public int compareTo( ScheduledRenewableTimeout o )
{ {
if ( timeoutTimestampMillis == o.timeoutTimestampMillis ) if ( timeoutTimestampMillis == o.timeoutTimestampMillis )
{ {
Expand All @@ -245,7 +253,7 @@ public boolean equals( Object o )
return false; return false;
} }


ScheduledTimeout timeout = (ScheduledTimeout) o; ScheduledRenewableTimeout timeout = (ScheduledRenewableTimeout) o;


return id == timeout.id; return id == timeout.id;
} }
Expand Down
Expand Up @@ -80,7 +80,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName


private final RenewableTimeoutService renewableTimeoutService; private final RenewableTimeoutService renewableTimeoutService;
private final long heartbeatInterval; private final long heartbeatInterval;
private RenewableTimeoutService.Timeout electionTimer; private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager<MEMBER> membershipManager; private RaftMembershipManager<MEMBER> membershipManager;


private final long electionTimeout; private final long electionTimeout;
Expand Down
Expand Up @@ -20,26 +20,69 @@
package org.neo4j.coreedge.raft; package org.neo4j.coreedge.raft;


/** /**
* A service for creating {@link RenewableTimeoutService.Timeout} instances. * A service for creating {@link RenewableTimeout} instances. Implementations of this interface are expected
* but not required to also act as lifecycle managers for the timeouts they return.
*/ */
public interface RenewableTimeoutService public interface RenewableTimeoutService
{ {
Timeout create( TimeoutName timeoutName, long milliseconds, long randomRange, TimeoutHandler handler ); /**
* The main factory method. Returns a {@link RenewableTimeout} handle for a {@link TimeoutHandler} that will trigger once,
* after {@param delayInMillis} plus or minus a random interval, unless renewed. Upon triggering, the supplied
* {@param handler} will be called.
* <br/>
* The time delay is a best effort service. In essence, the delay should not be expected to be accurate to the
* millisecond, as thread scheduling and other factors may influence when the handler is actually called.
*
* @param timeoutName The timeout name, for lookup purposes
* @param delayInMillis The amount of time after this timeout will be triggered
* @param randomRangeInMillis The upper limit of a range of longs (0 is the lower limit) from which a random value will be
* selected with uniform probability, and added to the delay. Setting this value to 0 means
* no randomness.
* @param handler The {@link TimeoutHandler} to call when this timeout triggers
* @return The timeout handle
*/
RenewableTimeout create( TimeoutName timeoutName, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler );


interface TimeoutName interface TimeoutName
{ {
String name(); String name();
} }


interface Timeout /**
* The handle for a timeout.
* This represents the timeout for calling a {@link TimeoutHandler}, as it is returned from a call to
* {@link RenewableTimeoutService#create(TimeoutName, long, long, TimeoutHandler)}.
*/
interface RenewableTimeout
{ {
/**
* Renews the timeout for the handler represented by this instance. The effect of calling this method is that
* upon return, the handler will be called in an interval equal to that provided to the
* {@link RenewableTimeoutService#create(TimeoutName, long, long, TimeoutHandler)} on creation.
* <br/>
* This timeout renewal effect takes place regardless of whether the handler has been called or not or if it
* has been cancelled.
*/
void renew(); void renew();


/**
* Cancels this timeout. If the handler has not been called, it will not be called when the delay specified on
* creation elapses. Calling {@link #renew()} will reset the delay and the handler will eventually be called.
*/
void cancel(); void cancel();
} }


/**
* Represents the action to take upon expiry of a timeout.
*/
interface TimeoutHandler interface TimeoutHandler
{ {
void onTimeout( Timeout timeout ); /**
* The callback method. This method is expected to execute in non blocking fashion, since it's possible that
* it will be run from the timer thread that will call other handlers as well.
* @param timeout The timeout representing this handler. This is mainly provided so that the timeout can be
* renewed upon completion of the callback.
*/
void onTimeout( RenewableTimeout timeout );
} }
} }
Expand Up @@ -100,7 +100,7 @@ enum Mode
private DelayedRenewableTimeoutService timeoutService; private DelayedRenewableTimeoutService timeoutService;
private final TimeoutName timeoutName = () -> "RESEND"; private final TimeoutName timeoutName = () -> "RESEND";
private final long retryTimeMillis; private final long retryTimeMillis;
private Timeout timeout; private RenewableTimeout timeout;


private long timeoutAbsoluteMillis; private long timeoutAbsoluteMillis;
private long lastSentIndex; private long lastSentIndex;
Expand Down
Expand Up @@ -28,19 +28,19 @@


public class ControlledRenewableTimeoutService implements RenewableTimeoutService public class ControlledRenewableTimeoutService implements RenewableTimeoutService
{ {
private Map<TimeoutName, Pair<TimeoutHandler, Timeout>> handlers = new HashMap<>(); private Map<TimeoutName, Pair<TimeoutHandler, RenewableTimeout>> handlers = new HashMap<>();


@Override @Override
public Timeout create( TimeoutName name, long milliseconds, long randomRange, TimeoutHandler handler ) public RenewableTimeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler )
{ {
Timeout timeout = mock( Timeout.class ); RenewableTimeout timeout = mock( RenewableTimeout.class );
handlers.put( name, Pair.of( handler, timeout ) ); handlers.put( name, Pair.of( handler, timeout ) );
return timeout; return timeout;
} }


public void invokeTimeout( TimeoutName name ) public void invokeTimeout( TimeoutName name )
{ {
Pair<TimeoutHandler, Timeout> pair = handlers.get( name ); Pair<TimeoutHandler, RenewableTimeout> pair = handlers.get( name );
pair.first().onTimeout( pair.other() ); pair.first().onTimeout( pair.other() );
} }
} }
Expand Up @@ -19,22 +19,18 @@
*/ */
package org.neo4j.coreedge.raft; package org.neo4j.coreedge.raft;


import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.neo4j.helpers.FakeClock;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.test.ArtificialClock; import org.neo4j.test.ArtificialClock;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;


import static org.neo4j.coreedge.raft.DelayedRenewableTimeoutServiceTest.Timeouts.FOOBAR;

public class DelayedRenewableTimeoutServiceTest public class DelayedRenewableTimeoutServiceTest
{ {
private final LifeSupport life = new LifeSupport(); private final LifeSupport life = new LifeSupport();
Expand Down Expand Up @@ -66,10 +62,10 @@ public void shouldTimeOutAfterTimeoutPeriod() throws Throwable


DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService(); DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService();


timeoutService.create( FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler() timeoutService.create( Timeouts.FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler()
{ {
@Override @Override
public void onTimeout( RenewableTimeoutService.Timeout timeout ) public void onTimeout( RenewableTimeoutService.RenewableTimeout timeout )
{ {
timeoutCount.incrementAndGet(); timeoutCount.incrementAndGet();
} }
Expand All @@ -95,10 +91,10 @@ public void shouldNotTimeOutWhenRenewedWithinTimeoutPeriod() throws Throwable


DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService(); DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService();


RenewableTimeoutService.Timeout timeout = timeoutService.create( FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler() RenewableTimeoutService.RenewableTimeout timeout = timeoutService.create( Timeouts.FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler()
{ {
@Override @Override
public void onTimeout( RenewableTimeoutService.Timeout timeout ) public void onTimeout( RenewableTimeoutService.RenewableTimeout timeout )
{ {
timeoutCount.incrementAndGet(); timeoutCount.incrementAndGet();
} }
Expand All @@ -125,7 +121,7 @@ public void shouldNotTimeOutWhenStopped() throws Throwable


DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService( clock ); DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService( clock );


RenewableTimeoutService.Timeout timeout1 = timeoutService.create( FOOBAR, 1000, 0, timeout -> timeoutCount RenewableTimeoutService.RenewableTimeout timeout = timeoutService.create( Timeouts.FOOBAR, 1000, 0, t -> timeoutCount
.incrementAndGet() ); .incrementAndGet() );


life.add( timeoutService ); life.add( timeoutService );
Expand All @@ -138,7 +134,7 @@ public void shouldNotTimeOutWhenStopped() throws Throwable
timeoutService.stop(); timeoutService.stop();
timeoutService.shutdown(); timeoutService.shutdown();


timeout1.renew(); timeout.renew();
Thread.sleep( 5 ); Thread.sleep( 5 );
clock.progress( 1000, MILLISECONDS ); clock.progress( 1000, MILLISECONDS );
Thread.sleep( 5 ); Thread.sleep( 5 );
Expand Down

0 comments on commit 60291fa

Please sign in to comment.