Skip to content

Commit

Permalink
Improve timeout service efficiency
Browse files Browse the repository at this point in the history
The timeout service is based on a polling mechanism and
was polling way to frequently for its intended usage.
It was also unnecessarily creating some garbage.
  • Loading branch information
martinfurmanski committed Dec 1, 2017
1 parent 0e8bad0 commit 57a640f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 27 deletions.
Expand Up @@ -20,12 +20,10 @@
package org.neo4j.causalclustering.core.consensus.schedule;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -49,14 +47,15 @@
*/
public class DelayedRenewableTimeoutService extends LifecycleAdapter implements Runnable, RenewableTimeoutService
{
private static final int TIMER_RESOLUTION = 1;
private static final int TIMER_RESOLUTION = 100;
private static final TimeUnit TIMER_RESOLUTION_UNIT = TimeUnit.MILLISECONDS;

/**
* Sorted by next-to-trigger.
*/
private final SortedSet<ScheduledRenewableTimeout> timeouts = new TreeSet<>();
private final Collection<ScheduledRenewableTimeout> timeouts = new ArrayList<>();
private final Queue<ScheduledRenewableTimeout> pendingRenewals = new ConcurrentLinkedDeque<>();
private final Collection<ScheduledRenewableTimeout> triggered = new ArrayList<>();
private final Clock clock;
private final Log log;
private final Random random;
Expand Down Expand Up @@ -104,6 +103,7 @@ private void cancel( ScheduledRenewableTimeout timeout )
synchronized ( timeouts )
{
timeouts.remove( timeout );
pendingRenewals.remove( timeout );
}
}

Expand All @@ -116,9 +116,7 @@ private long calcTimeoutTimestamp( long milliseconds, long randomRange )
@Override
public synchronized void run()
{

long now = clock.millis();
Collection<ScheduledRenewableTimeout> triggered = new LinkedList<>();

synchronized ( timeouts )
{
Expand All @@ -138,12 +136,6 @@ public synchronized void run()
{
triggered.add( timeout );
}
else
{
// Since the timeouts are sorted, the first timeout we hit that should not be triggered means
// there are no others that should either, so we bail.
break;
}
}
}

Expand All @@ -163,6 +155,8 @@ public synchronized void run()
{
timeouts.removeAll( triggered );
}

triggered.clear();
}

@Override
Expand All @@ -186,7 +180,7 @@ public void stop()
scheduler.shutdown();
}

static class ScheduledRenewableTimeout implements RenewableTimeout, Comparable<ScheduledRenewableTimeout>
static class ScheduledRenewableTimeout implements RenewableTimeout
{
private static final AtomicLong idGen = new AtomicLong();
private final long id = idGen.getAndIncrement();
Expand Down Expand Up @@ -233,19 +227,6 @@ void setTimeoutTimestamp( long newTimestamp )
this.timeoutTimestampMillis = newTimestamp;
}

@Override
public int compareTo( ScheduledRenewableTimeout renewableTimeout )
{
if ( timeoutTimestampMillis == renewableTimeout.timeoutTimestampMillis )
{
// Timeouts are set to trigger at the same time.
// Order them by id instead.
return (int) (id - renewableTimeout.id);
}

return (int) (timeoutTimestampMillis - renewableTimeout.timeoutTimestampMillis);
}

@Override
public boolean equals( Object o )
{
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.causalclustering.core.consensus.election;

import org.junit.Ignore;
import org.junit.Test;

import java.util.Set;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void electionPerformance_NormalConditions() throws Throwable
}

@Test
@Ignore( "This should be moved to a benchmarking suite" )
public void electionPerformance_RapidConditions() throws Throwable
{
// given parameters
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.time.FakeClock;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -205,4 +206,47 @@ public void run()
latch.countDown();
cancelThread.interrupt();
}

@Test
public void shouldCancelPendingRenewals() throws Throwable
{
// given
final AtomicLong timeoutCount = new AtomicLong();

FakeClock clock = Clocks.fakeClock();

DelayedRenewableTimeoutService timeoutService = new DelayedRenewableTimeoutService( clock,
NullLogProvider.getInstance() );

CountDownLatch latch = new CountDownLatch( 1 );
RenewableTimeoutService.RenewableTimeout timeout = timeoutService.create( Timeouts.FOOBAR, TIMEOUT_MS, 0, t -> {
t.renew();
timeoutCount.incrementAndGet();
try
{
latch.await();
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
} );

life.add( timeoutService );

clock.forward( TIMEOUT_MS, MILLISECONDS );
Predicates.await( timeoutCount::get, count -> count == 1, LONG_TIME_MS, MILLISECONDS, 1, MILLISECONDS );
timeout.cancel();
latch.countDown();

// when
for ( int i = 0; i < 10; i++ )
{
clock.forward( TIMEOUT_MS, MILLISECONDS );
timeoutService.run();
}

// then
assertEquals( 1, timeoutCount.get() );
}
}

0 comments on commit 57a640f

Please sign in to comment.