Skip to content

Commit

Permalink
Simplify the TimeBasedTaskScheduler.
Browse files Browse the repository at this point in the history
This also fixes a bug where waitTermination would not unblock if a task was cancelled before it got to run for the first time.
This also removes a spinning behaviour of the scheduler, when recurring tasks ended up overdue by not completing in time for their next schedule.
  • Loading branch information
chrisvest committed Mar 15, 2018
1 parent 9b92488 commit 3812a8d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 126 deletions.
Expand Up @@ -47,31 +47,33 @@
* <li>Failed handles will not be scheduled again.</li>
* </ul>
*/
final class ScheduledJobHandle implements JobHandle
final class ScheduledJobHandle extends AtomicInteger implements JobHandle
{
static final int STATE_RUNNABLE = 0;
static final int STATE_SUBMITTED = 1;
static final int STATE_FAILED = 2;
// We extend AtomicInteger to inline our state field.
// These are the possible state values:
private static final int RUNNABLE = 0;
private static final int SUBMITTED = 1;
private static final int FAILED = 2;

// Accessed and modified by the TimeBasedTaskScheduler:
volatile ScheduledJobHandle next;
// Access is synchronised via the PriorityBlockingQueue in TimeBasedTaskScheduler:
// - Write to this field happens before the handle is added to the queue.
// - Reads of this field happens after the handle has been read from the queue.
// - Reads of this field for the purpose of ordering the queue are either thread local,
// or happens after the relevant handles have been added to the queue.
long nextDeadlineNanos;

private final JobScheduler.Group group;
private final long reschedulingDelayNanos;
private final AtomicInteger state;
private final CopyOnWriteArrayList<CancelListener> cancelListeners;
private final BinaryLatch handleRelease;
private final Runnable task;
private volatile JobHandle latestHandle;
private volatile Throwable lastException;

ScheduledJobHandle( JobScheduler.Group group, Runnable task, long nextDeadlineNanos, long reschedulingDelayNanos )
ScheduledJobHandle( TimeBasedTaskScheduler scheduler, JobScheduler.Group group, Runnable task,
long nextDeadlineNanos, long reschedulingDelayNanos )
{
this.group = group;
this.nextDeadlineNanos = nextDeadlineNanos;
this.reschedulingDelayNanos = reschedulingDelayNanos;
state = new AtomicInteger();
handleRelease = new BinaryLatch();
cancelListeners = new CopyOnWriteArrayList<>();
this.task = () ->
Expand All @@ -80,41 +82,36 @@ final class ScheduledJobHandle implements JobHandle
{
task.run();
// Use compareAndSet to avoid overriding any cancellation state.
compareAndSetState( STATE_SUBMITTED, STATE_RUNNABLE );
if ( compareAndSet( SUBMITTED, RUNNABLE ) && reschedulingDelayNanos > 0 )
{
// We only reschedule if the rescheduling delay is greater than zero.
// A rescheduling delay of zero means this is a delayed task.
// If the rescheduling delay is greater than zero, then this is a recurring task.
this.nextDeadlineNanos += reschedulingDelayNanos;
scheduler.enqueueTask( this );
}
}
catch ( Throwable e )
{
lastException = e;
state.set( STATE_FAILED );
set( FAILED );
}
};
}

boolean compareAndSetState( int expect, int update )
void submitIfRunnable( ThreadPoolManager pools )
{
return state.compareAndSet( expect, update );
}

int getState()
{
return state.get();
}

long getReschedulingDelayNanos()
{
return reschedulingDelayNanos;
}

void submitTo( ThreadPoolManager pools )
{
latestHandle = pools.submit( group, task );
handleRelease.release();
if ( compareAndSet( RUNNABLE, SUBMITTED ) )
{
latestHandle = pools.submit( group, task );
handleRelease.release();
}
}

@Override
public void cancel( boolean mayInterruptIfRunning )
{
state.set( STATE_FAILED );
set( FAILED );
JobHandle handle = latestHandle;
if ( handle != null )
{
Expand All @@ -124,14 +121,20 @@ public void cancel( boolean mayInterruptIfRunning )
{
cancelListener.cancelled( mayInterruptIfRunning );
}
// Release the handle to allow waitTermination() to observe the cancellation.
handleRelease.release();
}

@Override
public void waitTermination() throws ExecutionException, InterruptedException
{
handleRelease.await();
latestHandle.waitTermination();
if ( state.get() == STATE_FAILED )
JobHandle handleDelegate = this.latestHandle;
if ( handleDelegate != null )
{
handleDelegate.waitTermination();
}
if ( get() == FAILED )
{
Throwable exception = this.lastException;
if ( exception != null )
Expand Down
Expand Up @@ -20,9 +20,8 @@
package org.neo4j.kernel.impl.scheduler;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.scheduler.JobScheduler.Group;
Expand All @@ -31,137 +30,74 @@

final class TimeBasedTaskScheduler implements Runnable
{
private static final ScheduledJobHandle END_SENTINEL = new ScheduledJobHandle( null, null, 0, 0 );
private static final long NO_TASKS_PARK = TimeUnit.MINUTES.toNanos( 10 );
private static final Comparator<ScheduledJobHandle> DEADLINE_COMPARATOR =
Comparator.comparingLong( handle -> handle.nextDeadlineNanos );

private final SystemNanoClock clock;
private final ThreadPoolManager pools;
private final AtomicReference<ScheduledJobHandle> inbox;
private final PriorityBlockingQueue<ScheduledJobHandle> delayedTasks;
private volatile Thread timeKeeper;
private volatile boolean stopped;
// This field is only access by the time keeper thread:
private final PriorityQueue<ScheduledJobHandle> delayedTasks;

TimeBasedTaskScheduler( SystemNanoClock clock, ThreadPoolManager pools )
{
this.clock = clock;
this.pools = pools;
inbox = new AtomicReference<>( END_SENTINEL );
delayedTasks = new PriorityQueue<>( DEADLINE_COMPARATOR );
delayedTasks = new PriorityBlockingQueue<>( 42, DEADLINE_COMPARATOR );
}

public JobHandle submit( Group group, Runnable job, long initialDelayNanos, long reschedulingDelayNanos )
{
long now = clock.nanos();
long nextDeadlineNanos = now + initialDelayNanos;
ScheduledJobHandle task = new ScheduledJobHandle( group, job, nextDeadlineNanos, reschedulingDelayNanos );
task.next = inbox.getAndSet( task );
LockSupport.unpark( timeKeeper );
ScheduledJobHandle task = new ScheduledJobHandle( this, group, job, nextDeadlineNanos, reschedulingDelayNanos );
enqueueTask( task );
return task;
}

public long tick()
void enqueueTask( ScheduledJobHandle newTasks )
{
long now = clock.nanos();
sortInbox();
long timeToNextDeadlineSinceStart = scheduleDueTasks( now );
long processingTime = clock.nanos() - now;
return timeToNextDeadlineSinceStart - processingTime;
delayedTasks.offer( newTasks );
LockSupport.unpark( timeKeeper );
}

private void sortInbox()
@Override
public void run()
{
ScheduledJobHandle newTasks = inbox.getAndSet( END_SENTINEL );
while ( newTasks != END_SENTINEL )
timeKeeper = Thread.currentThread();
while ( !stopped )
{
// Capture next chain link before enqueueing among delayed tasks.
ScheduledJobHandle next;
do
long timeToNextTickNanos = tick();
if ( stopped )
{
next = newTasks.next;
return;
}
while ( next == null );

enqueueTask( newTasks );

newTasks = next;
LockSupport.parkNanos( this, timeToNextTickNanos );
}
}

private void enqueueTask( ScheduledJobHandle newTasks )
public long tick()
{
newTasks.next = null; // Assigning null helps prevent GC nepotism.
delayedTasks.offer( newTasks );
long now = clock.nanos();
long timeToNextDeadlineSinceStart = scheduleDueTasks( now );
long processingTime = clock.nanos() - now;
return timeToNextDeadlineSinceStart - processingTime;
}

private long scheduleDueTasks( long now )
{
if ( delayedTasks.isEmpty() )
{
// We have no tasks to run. Park until we're woken up by a submit().
// We have no tasks to run. Park until we're woken up by an enqueueTask() call.
return NO_TASKS_PARK;
}
ScheduledJobHandle due = spliceOutDueTasks( now );
submitAndEnqueueTasks( due, now );
return delayedTasks.isEmpty() ? NO_TASKS_PARK : delayedTasks.peek().nextDeadlineNanos - now;
}

private ScheduledJobHandle spliceOutDueTasks( long now )
{
ScheduledJobHandle due = null;
while ( !delayedTasks.isEmpty() && delayedTasks.peek().nextDeadlineNanos <= now )
while ( !stopped && !delayedTasks.isEmpty() && delayedTasks.peek().nextDeadlineNanos <= now )
{
ScheduledJobHandle task = delayedTasks.poll();
task.next = due;
due = task;
}
return due;
}

private void submitAndEnqueueTasks( ScheduledJobHandle due, long now )
{
while ( due != null )
{
// Make sure to grab the 'next' reference before any call to enqueueTask.
ScheduledJobHandle next = due.next;
if ( due.compareAndSetState( ScheduledJobHandle.STATE_RUNNABLE, ScheduledJobHandle.STATE_SUBMITTED ) )
{
long reschedulingDelayNanos = due.getReschedulingDelayNanos();
due.nextDeadlineNanos = reschedulingDelayNanos + now;
due.submitTo( pools );
if ( reschedulingDelayNanos > 0 )
{
enqueueTask( due );
}
// If the rescheduling delay is zero or less, then this wasn't a recurring task, but just a delayed one,
// which means we don't enqueue it again.
}
else if ( due.getState() != ScheduledJobHandle.STATE_FAILED )
{
// It's still running, so it's now overdue.
due.nextDeadlineNanos = now;
enqueueTask( due );
}
// Otherwise it's failed, in which case we just throw it away, and continue processing the chain.
due = next;
}
}

@Override
public void run()
{
timeKeeper = Thread.currentThread();
while ( !stopped )
{
long timeToNextTickNanos = tick();
if ( inbox.get() == END_SENTINEL )
{
// Only park if nothing has been posted to our inbox while we were processing the last tick.
LockSupport.parkNanos( this, timeToNextTickNanos );
}
task.submitIfRunnable( pools );
}
return delayedTasks.isEmpty() ? NO_TASKS_PARK : delayedTasks.peek().nextDeadlineNanos - now;
}

public void stop()
Expand Down
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -175,7 +176,7 @@ public void mustNotRescheduleRecurringTasksThatThrows() throws Exception
}

@Test
public void mustNotStartRecurringTasksWherePriorExecutionHasNotYetFinished() throws Exception
public void mustNotStartRecurringTasksWherePriorExecutionHasNotYetFinished()
{
Runnable runnable = () ->
{
Expand Down Expand Up @@ -211,7 +212,7 @@ public void longRunningTasksMustNotDelayExecutionOfOtherTasks() throws Exception
}

@Test
public void delayedTasksMustNotRunIfCancelledFirst()
public void delayedTasksMustNotRunIfCancelledFirst() throws Exception
{
List<Boolean> cancelListener = new ArrayList<>();
JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 );
Expand All @@ -224,6 +225,15 @@ public void delayedTasksMustNotRunIfCancelledFirst()
pools.getThreadPool( group ).shutDown();
assertThat( counter.get(), is( 0 ) );
assertThat( cancelListener, contains( Boolean.FALSE ) );
try
{
handle.waitTermination();
fail( "waitTermination should have thrown a CancellationException." );
}
catch ( CancellationException ignore )
{
// Good stuff.
}
}

@Test
Expand Down Expand Up @@ -254,7 +264,7 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException
}

@Test
public void overdueRecurringTasksMustStartAsSoonAsPossible() throws Exception
public void overdueRecurringTasksMustStartAsSoonAsPossible()
{
Runnable recurring = () ->
{
Expand Down

0 comments on commit 3812a8d

Please sign in to comment.