Skip to content

Commit

Permalink
Make the TimeBasedTaskScheduler use a PriorityQueue for scheduling, i…
Browse files Browse the repository at this point in the history
…nstead of a sorted linked list.

This simplifies the code a bit, and should also have better performance charactaristics when there is a large number of scheduled tasks.
  • Loading branch information
chrisvest committed Mar 15, 2018
1 parent 3916a9b commit 9b92488
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.scheduler;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
Expand All @@ -31,20 +33,23 @@ final class TimeBasedTaskScheduler implements Runnable
{
private static final ScheduledJobHandle END_SENTINEL = new ScheduledJobHandle( null, null, 0, 0 );
private static final long NO_TASKS_PARK = TimeUnit.MINUTES.toNanos( 10 );
private static final Comparator<ScheduledJobHandle> DEADLINE_COMPARATOR =
Comparator.comparingLong( handle -> handle.nextDeadlineNanos );

private final SystemNanoClock clock;
private final ThreadPoolManager pools;
private final AtomicReference<ScheduledJobHandle> inbox;
private volatile Thread timeKeeper;
private volatile boolean stopped;
// This field is only access by the time keeper thread:
private ScheduledJobHandle delayedTasks;
private final PriorityQueue<ScheduledJobHandle> delayedTasks;

TimeBasedTaskScheduler( SystemNanoClock clock, ThreadPoolManager pools )
{
this.clock = clock;
this.pools = pools;
inbox = new AtomicReference<>( END_SENTINEL );
delayedTasks = new PriorityQueue<>( DEADLINE_COMPARATOR );
}

public JobHandle submit( Group group, Runnable job, long initialDelayNanos, long reschedulingDelayNanos )
Expand Down Expand Up @@ -87,54 +92,39 @@ private void sortInbox()

private void enqueueTask( ScheduledJobHandle newTasks )
{
if ( delayedTasks == null || newTasks.nextDeadlineNanos <= delayedTasks.nextDeadlineNanos )
{
newTasks.next = delayedTasks;
delayedTasks = newTasks;
}
else
{
ScheduledJobHandle head = delayedTasks;
while ( head.next != null && head.next.nextDeadlineNanos < newTasks.nextDeadlineNanos )
{
head = head.next;
}
newTasks.next = head.next;
head.next = newTasks;
}
newTasks.next = null; // Assigning null helps prevent GC nepotism.
delayedTasks.offer( newTasks );
}

private long scheduleDueTasks( long now )
{
if ( delayedTasks == null )
if ( delayedTasks.isEmpty() )
{
// We have no tasks to run. Park until we're woken up by a submit().
return NO_TASKS_PARK;
}
ScheduledJobHandle due = spliceOutDueTasks( now );
submitAndEnqueueTasks( due, now );
return delayedTasks == null ? NO_TASKS_PARK : delayedTasks.nextDeadlineNanos - now;
return delayedTasks.isEmpty() ? NO_TASKS_PARK : delayedTasks.peek().nextDeadlineNanos - now;
}

private ScheduledJobHandle spliceOutDueTasks( long now )
{
ScheduledJobHandle due = null;
ScheduledJobHandle delayed = delayedTasks;
while ( delayed != null && delayed.nextDeadlineNanos <= now )
while ( !delayedTasks.isEmpty() && delayedTasks.peek().nextDeadlineNanos <= now )
{
ScheduledJobHandle next = delayed.next;
delayed.next = due;
due = delayed;
delayed = next;
ScheduledJobHandle task = delayedTasks.poll();
task.next = due;
due = task;
}
delayedTasks = delayed;
return due;
}

private void submitAndEnqueueTasks( ScheduledJobHandle due, long now )
{
while ( due != null )
{
// Make sure to grab the 'next' reference before any call to enqueueTask.
ScheduledJobHandle next = due.next;
if ( due.compareAndSetState( ScheduledJobHandle.STATE_RUNNABLE, ScheduledJobHandle.STATE_SUBMITTED ) )
{
Expand Down
Expand Up @@ -104,6 +104,23 @@ public void mustDelayExecution() throws Exception
assertThat( counter.get(), is( 1 ) );
}

@Test
public void mustOnlyScheduleTasksThatAreDue() throws Exception
{
JobHandle handle1 = scheduler.submit( group, () -> counter.addAndGet( 10 ), 100, 0 );
JobHandle handle2 = scheduler.submit( group, () -> counter.addAndGet( 100 ), 200, 0 );
scheduler.tick();
assertThat( counter.get(), is( 0 ) );
clock.forward( 199, TimeUnit.NANOSECONDS );
scheduler.tick();
handle1.waitTermination();
assertThat( counter.get(), is( 10 ) );
clock.forward( 1, TimeUnit.NANOSECONDS );
scheduler.tick();
handle2.waitTermination();
assertThat( counter.get(), is( 110 ) );
}

@Test
public void mustNotRescheduleDelayedTasks() throws Exception
{
Expand Down

0 comments on commit 9b92488

Please sign in to comment.