Skip to content

Commit

Permalink
Some additional TimeBasedTaskScheduler changes that improve timelines…
Browse files Browse the repository at this point in the history
…s, reaction time to new tasks submitions, and shuts the CentralJobScheduler down in the right order.
  • Loading branch information
chrisvest committed Mar 15, 2018
1 parent 5612423 commit b56bc47
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 16 deletions.
Expand Up @@ -134,20 +134,14 @@ public void shutdown()
{
started = false;

// Cancel jobs which hasn't been cancelled already, this to avoid having to wait the full
// max wait time and then just leave them.
InterruptedException exception = pools.shutDownAll();
// First shut down the scheduler, so no new tasks are queued up in the pools.
InterruptedException exception = shutDownScheduler();

scheduler.stop();
try
{
schedulerThread.join();
}
catch ( InterruptedException e )
{
exception = Exceptions.chain( exception, e );
}
// Then shut down the thread pools. This involves cancelling jobs which hasn't been cancelled already,
// so we avoid having to wait the full maximum wait time on the executor service shut-downs.
exception = Exceptions.chain( exception, pools.shutDownAll() );

// Finally, we shut the work-stealing executors down.
for ( ExecutorService workStealingExecutor : workStealingExecutors.values() )
{
exception = shutdownPool( workStealingExecutor, exception );
Expand All @@ -160,6 +154,20 @@ public void shutdown()
}
}

private InterruptedException shutDownScheduler()
{
scheduler.stop();
try
{
schedulerThread.join();
}
catch ( InterruptedException e )
{
return e;
}
return null;
}

private InterruptedException shutdownPool( ExecutorService pool, InterruptedException exception )
{
if ( pool != null )
Expand Down
Expand Up @@ -61,7 +61,9 @@ public long tick()
{
long now = clock.nanos();
sortInbox();
return scheduleDueTasks( now );
long timeToNextDeadlineSinceStart = scheduleDueTasks( now );
long processingTime = clock.nanos() - now;
return timeToNextDeadlineSinceStart - processingTime;
}

private void sortInbox()
Expand Down Expand Up @@ -164,7 +166,11 @@ public void run()
while ( !stopped )
{
long timeToNextTickNanos = tick();
LockSupport.parkNanos( this, timeToNextTickNanos );
if ( inbox.get() == END_SENTINEL )
{
// Only park if nothing has been posted to our inbox while we were processing the last tick.
LockSupport.parkNanos( this, timeToNextTickNanos );
}
}
}

Expand Down
Expand Up @@ -70,7 +70,7 @@ public void stopScheduler()
public void shouldRunRecurringJob() throws Throwable
{
// Given
long period = 100;
long period = 10;
int count = 5;
life.start();

Expand All @@ -81,7 +81,7 @@ public void shouldRunRecurringJob() throws Throwable

// Then assert that the recurring job was stopped (when the scheduler was shut down)
int actualInvocations = invocations.get();
sleep( period * 2 );
sleep( period * 5 );
assertThat( invocations.get(), equalTo( actualInvocations ) );
}

Expand Down
Expand Up @@ -222,4 +222,36 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException
assertThat( counter.get(), is( 2 ) );
assertThat( cancelListener, contains( Boolean.TRUE ) );
}

@Test
public void overdueRecurringTasksMustStartAsSoonAsPossible() throws Exception
{
Runnable recurring = () ->
{
counter.incrementAndGet();
semaphore.acquireUninterruptibly();
};
JobHandle handle = scheduler.submit( group, recurring, 100, 100 );
clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick();
while ( counter.get() < 1 )
{
// Spin.
Thread.yield();
}
clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick();
clock.forward( 100, TimeUnit.NANOSECONDS );
semaphore.release();
scheduler.tick();
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos( 10 );
while ( counter.get() < 2 && System.nanoTime() < deadline )
{
scheduler.tick();
Thread.yield();
}
assertThat( counter.get(), is( 2 ) );
semaphore.release( Integer.MAX_VALUE );
handle.cancel( false );
}
}

0 comments on commit b56bc47

Please sign in to comment.