From b56bc476bcfcec97ac95d5e89ee6a104f8ebe0f7 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 27 Feb 2018 10:35:50 +0100 Subject: [PATCH] Some additional TimeBasedTaskScheduler changes that improve timeliness, reaction time to new tasks submitions, and shuts the CentralJobScheduler down in the right order. --- .../impl/scheduler/CentralJobScheduler.java | 32 ++++++++++++------- .../scheduler/TimeBasedTaskScheduler.java | 10 ++++-- .../scheduler/CentralJobSchedulerTest.java | 4 +-- .../scheduler/TimeBasedTaskSchedulerTest.java | 32 +++++++++++++++++++ 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java index e512bc5065a17..8503a5c62c079 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java @@ -134,20 +134,14 @@ public void shutdown() { started = false; - // Cancel jobs which hasn't been cancelled already, this to avoid having to wait the full - // max wait time and then just leave them. - InterruptedException exception = pools.shutDownAll(); + // First shut down the scheduler, so no new tasks are queued up in the pools. + InterruptedException exception = shutDownScheduler(); - scheduler.stop(); - try - { - schedulerThread.join(); - } - catch ( InterruptedException e ) - { - exception = Exceptions.chain( exception, e ); - } + // Then shut down the thread pools. This involves cancelling jobs which hasn't been cancelled already, + // so we avoid having to wait the full maximum wait time on the executor service shut-downs. + exception = Exceptions.chain( exception, pools.shutDownAll() ); + // Finally, we shut the work-stealing executors down. for ( ExecutorService workStealingExecutor : workStealingExecutors.values() ) { exception = shutdownPool( workStealingExecutor, exception ); @@ -160,6 +154,20 @@ public void shutdown() } } + private InterruptedException shutDownScheduler() + { + scheduler.stop(); + try + { + schedulerThread.join(); + } + catch ( InterruptedException e ) + { + return e; + } + return null; + } + private InterruptedException shutdownPool( ExecutorService pool, InterruptedException exception ) { if ( pool != null ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java index 7113672d62fca..8979f61bb0427 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java @@ -61,7 +61,9 @@ public long tick() { long now = clock.nanos(); sortInbox(); - return scheduleDueTasks( now ); + long timeToNextDeadlineSinceStart = scheduleDueTasks( now ); + long processingTime = clock.nanos() - now; + return timeToNextDeadlineSinceStart - processingTime; } private void sortInbox() @@ -164,7 +166,11 @@ public void run() while ( !stopped ) { long timeToNextTickNanos = tick(); - LockSupport.parkNanos( this, timeToNextTickNanos ); + if ( inbox.get() == END_SENTINEL ) + { + // Only park if nothing has been posted to our inbox while we were processing the last tick. + LockSupport.parkNanos( this, timeToNextTickNanos ); + } } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java index 716c39c7d177e..55e976a1d69aa 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java @@ -70,7 +70,7 @@ public void stopScheduler() public void shouldRunRecurringJob() throws Throwable { // Given - long period = 100; + long period = 10; int count = 5; life.start(); @@ -81,7 +81,7 @@ public void shouldRunRecurringJob() throws Throwable // Then assert that the recurring job was stopped (when the scheduler was shut down) int actualInvocations = invocations.get(); - sleep( period * 2 ); + sleep( period * 5 ); assertThat( invocations.get(), equalTo( actualInvocations ) ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java index bb1e922c253ae..b4792f2f9e216 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java @@ -222,4 +222,36 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException assertThat( counter.get(), is( 2 ) ); assertThat( cancelListener, contains( Boolean.TRUE ) ); } + + @Test + public void overdueRecurringTasksMustStartAsSoonAsPossible() throws Exception + { + Runnable recurring = () -> + { + counter.incrementAndGet(); + semaphore.acquireUninterruptibly(); + }; + JobHandle handle = scheduler.submit( group, recurring, 100, 100 ); + clock.forward( 100, TimeUnit.NANOSECONDS ); + scheduler.tick(); + while ( counter.get() < 1 ) + { + // Spin. + Thread.yield(); + } + clock.forward( 100, TimeUnit.NANOSECONDS ); + scheduler.tick(); + clock.forward( 100, TimeUnit.NANOSECONDS ); + semaphore.release(); + scheduler.tick(); + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos( 10 ); + while ( counter.get() < 2 && System.nanoTime() < deadline ) + { + scheduler.tick(); + Thread.yield(); + } + assertThat( counter.get(), is( 2 ) ); + semaphore.release( Integer.MAX_VALUE ); + handle.cancel( false ); + } }