diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutService.java index 8089b80e8878..3df7ba6ff6b5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutService.java @@ -38,6 +38,14 @@ import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; +/** + * A bare bones, wall clock based implementation of the {@link RenewableTimeoutService}. It uses a scheduled thread + * pool to check for timeouts and as such has a limited resolution of {@link #TIMER_RESOLUTION}, measured in + * {@link #TIMER_RESOLUTION_UNIT}. For the same reason, the timeouts are triggered at an approximate delay rather than + * exactly at the value requested. + * {@link org.neo4j.coreedge.raft.RenewableTimeoutService.TimeoutHandler} are all called from the same thread, + * so users should be aware to not perform time consuming tasks in them. + */ public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService { public static final int TIMER_RESOLUTION = 1; @@ -46,8 +54,8 @@ public class DelayedRenewableTimeoutService extends LifecycleAdapter implements /** * Sorted by next-to-trigger. */ - private final SortedSet timeouts = new TreeSet<>(); - private final Queue pendingRenewals = new ConcurrentLinkedDeque<>(); + private final SortedSet timeouts = new TreeSet<>(); + private final Queue pendingRenewals = new ConcurrentLinkedDeque<>(); private final Clock clock; private final Random random; private final JobScheduler scheduler; @@ -75,9 +83,9 @@ public DelayedRenewableTimeoutService( Clock clock ) * If you don't want randomness, set randomRangeInMillis to 0. */ @Override - public Timeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler ) + public RenewableTimeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler ) { - ScheduledTimeout timeout = new ScheduledTimeout( + ScheduledRenewableTimeout timeout = new ScheduledRenewableTimeout( calcTimeoutTimestamp( delayInMillis, randomRangeInMillis ), delayInMillis, randomRangeInMillis, handler, this ); @@ -89,12 +97,12 @@ public Timeout create( TimeoutName name, long delayInMillis, long randomRangeInM return timeout; } - public void renew( ScheduledTimeout timeout ) + public void renew( ScheduledRenewableTimeout timeout ) { pendingRenewals.offer( timeout ); } - public void cancel( ScheduledTimeout timeout ) + public void cancel( ScheduledRenewableTimeout timeout ) { synchronized ( timeouts ) { @@ -114,12 +122,12 @@ public void run() try { long now = clock.currentTimeMillis(); - Collection triggered = new LinkedList<>(); + Collection triggered = new LinkedList<>(); synchronized ( timeouts ) { // Handle renewals - ScheduledTimeout renew; + ScheduledRenewableTimeout renew; while ( (renew = pendingRenewals.poll()) != null ) { timeouts.remove( renew ); @@ -128,7 +136,7 @@ public void run() } // Trigger timeouts - for ( ScheduledTimeout timeout : timeouts ) + for ( ScheduledRenewableTimeout timeout : timeouts ) { if ( timeout.shouldTrigger( now ) ) { @@ -173,7 +181,7 @@ public void stop() throws Throwable scheduler.shutdown(); } - public static class ScheduledTimeout implements Timeout, Comparable + public static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable { private static final AtomicLong idGen = new AtomicLong(); private final long id = idGen.getAndIncrement(); @@ -183,7 +191,7 @@ public static class ScheduledTimeout implements Timeout, Comparable membershipManager; private final long electionTimeout; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RenewableTimeoutService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RenewableTimeoutService.java index 50e129313192..09b79fc00aae 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RenewableTimeoutService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RenewableTimeoutService.java @@ -20,26 +20,69 @@ package org.neo4j.coreedge.raft; /** - * A service for creating {@link RenewableTimeoutService.Timeout} instances. + * A service for creating {@link RenewableTimeout} instances. Implementations of this interface are expected + * but not required to also act as lifecycle managers for the timeouts they return. */ public interface RenewableTimeoutService { - Timeout create( TimeoutName timeoutName, long milliseconds, long randomRange, TimeoutHandler handler ); + /** + * The main factory method. Returns a {@link RenewableTimeout} handle for a {@link TimeoutHandler} that will trigger once, + * after {@param delayInMillis} plus or minus a random interval, unless renewed. Upon triggering, the supplied + * {@param handler} will be called. + *
+ * The time delay is a best effort service. In essence, the delay should not be expected to be accurate to the + * millisecond, as thread scheduling and other factors may influence when the handler is actually called. + * + * @param timeoutName The timeout name, for lookup purposes + * @param delayInMillis The amount of time after this timeout will be triggered + * @param randomRangeInMillis The upper limit of a range of longs (0 is the lower limit) from which a random value will be + * selected with uniform probability, and added to the delay. Setting this value to 0 means + * no randomness. + * @param handler The {@link TimeoutHandler} to call when this timeout triggers + * @return The timeout handle + */ + RenewableTimeout create( TimeoutName timeoutName, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler ); interface TimeoutName { String name(); } - interface Timeout + /** + * The handle for a timeout. + * This represents the timeout for calling a {@link TimeoutHandler}, as it is returned from a call to + * {@link RenewableTimeoutService#create(TimeoutName, long, long, TimeoutHandler)}. + */ + interface RenewableTimeout { + /** + * Renews the timeout for the handler represented by this instance. The effect of calling this method is that + * upon return, the handler will be called in an interval equal to that provided to the + * {@link RenewableTimeoutService#create(TimeoutName, long, long, TimeoutHandler)} on creation. + *
+ * This timeout renewal effect takes place regardless of whether the handler has been called or not or if it + * has been cancelled. + */ void renew(); + /** + * Cancels this timeout. If the handler has not been called, it will not be called when the delay specified on + * creation elapses. Calling {@link #renew()} will reset the delay and the handler will eventually be called. + */ void cancel(); } + /** + * Represents the action to take upon expiry of a timeout. + */ interface TimeoutHandler { - void onTimeout( Timeout timeout ); + /** + * The callback method. This method is expected to execute in non blocking fashion, since it's possible that + * it will be run from the timer thread that will call other handlers as well. + * @param timeout The timeout representing this handler. This is mainly provided so that the timeout can be + * renewed upon completion of the callback. + */ + void onTimeout( RenewableTimeout timeout ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java index b7f40def941d..73a50ba0f470 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java @@ -100,7 +100,7 @@ enum Mode private DelayedRenewableTimeoutService timeoutService; private final TimeoutName timeoutName = () -> "RESEND"; private final long retryTimeMillis; - private Timeout timeout; + private RenewableTimeout timeout; private long timeoutAbsoluteMillis; private long lastSentIndex; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/ControlledRenewableTimeoutService.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/ControlledRenewableTimeoutService.java index 8a0197928a65..61d9fc9c046e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/ControlledRenewableTimeoutService.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/ControlledRenewableTimeoutService.java @@ -28,19 +28,19 @@ public class ControlledRenewableTimeoutService implements RenewableTimeoutService { - private Map> handlers = new HashMap<>(); + private Map> handlers = new HashMap<>(); @Override - public Timeout create( TimeoutName name, long milliseconds, long randomRange, TimeoutHandler handler ) + public RenewableTimeout create( TimeoutName name, long delayInMillis, long randomRangeInMillis, TimeoutHandler handler ) { - Timeout timeout = mock( Timeout.class ); + RenewableTimeout timeout = mock( RenewableTimeout.class ); handlers.put( name, Pair.of( handler, timeout ) ); return timeout; } public void invokeTimeout( TimeoutName name ) { - Pair pair = handlers.get( name ); + Pair pair = handlers.get( name ); pair.first().onTimeout( pair.other() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutServiceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutServiceTest.java index 8b95aa62be53..ab4ac71cdad6 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutServiceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/DelayedRenewableTimeoutServiceTest.java @@ -19,13 +19,11 @@ */ package org.neo4j.coreedge.raft; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.neo4j.helpers.FakeClock; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.test.ArtificialClock; @@ -33,8 +31,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.neo4j.coreedge.raft.DelayedRenewableTimeoutServiceTest.Timeouts.FOOBAR; - public class DelayedRenewableTimeoutServiceTest { private final LifeSupport life = new LifeSupport(); @@ -66,10 +62,10 @@ public void shouldTimeOutAfterTimeoutPeriod() throws Throwable DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService(); - timeoutService.create( FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler() + timeoutService.create( Timeouts.FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler() { @Override - public void onTimeout( RenewableTimeoutService.Timeout timeout ) + public void onTimeout( RenewableTimeoutService.RenewableTimeout timeout ) { timeoutCount.incrementAndGet(); } @@ -95,10 +91,10 @@ public void shouldNotTimeOutWhenRenewedWithinTimeoutPeriod() throws Throwable DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService(); - RenewableTimeoutService.Timeout timeout = timeoutService.create( FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler() + RenewableTimeoutService.RenewableTimeout timeout = timeoutService.create( Timeouts.FOOBAR, 1000, 0, new RenewableTimeoutService.TimeoutHandler() { @Override - public void onTimeout( RenewableTimeoutService.Timeout timeout ) + public void onTimeout( RenewableTimeoutService.RenewableTimeout timeout ) { timeoutCount.incrementAndGet(); } @@ -125,7 +121,7 @@ public void shouldNotTimeOutWhenStopped() throws Throwable DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService( clock ); - RenewableTimeoutService.Timeout timeout1 = timeoutService.create( FOOBAR, 1000, 0, timeout -> timeoutCount + RenewableTimeoutService.RenewableTimeout timeout = timeoutService.create( Timeouts.FOOBAR, 1000, 0, t -> timeoutCount .incrementAndGet() ); life.add( timeoutService ); @@ -138,7 +134,7 @@ public void shouldNotTimeOutWhenStopped() throws Throwable timeoutService.stop(); timeoutService.shutdown(); - timeout1.renew(); + timeout.renew(); Thread.sleep( 5 ); clock.progress( 1000, MILLISECONDS ); Thread.sleep( 5 );