From 9b92488087225b4f535250f8ebe15cd20fdcb981 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 6 Mar 2018 15:29:34 +0100 Subject: [PATCH] Make the TimeBasedTaskScheduler use a PriorityQueue for scheduling, instead of a sorted linked list. This simplifies the code a bit, and should also have better performance charactaristics when there is a large number of scheduled tasks. --- .../scheduler/TimeBasedTaskScheduler.java | 40 +++++++------------ .../scheduler/TimeBasedTaskSchedulerTest.java | 17 ++++++++ 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java index 649d694b43625..40198596db2f0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskScheduler.java @@ -19,6 +19,8 @@ */ package org.neo4j.kernel.impl.scheduler; +import java.util.Comparator; +import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -31,6 +33,8 @@ final class TimeBasedTaskScheduler implements Runnable { private static final ScheduledJobHandle END_SENTINEL = new ScheduledJobHandle( null, null, 0, 0 ); private static final long NO_TASKS_PARK = TimeUnit.MINUTES.toNanos( 10 ); + private static final Comparator DEADLINE_COMPARATOR = + Comparator.comparingLong( handle -> handle.nextDeadlineNanos ); private final SystemNanoClock clock; private final ThreadPoolManager pools; @@ -38,13 +42,14 @@ final class TimeBasedTaskScheduler implements Runnable private volatile Thread timeKeeper; private volatile boolean stopped; // This field is only access by the time keeper thread: - private ScheduledJobHandle delayedTasks; + private final PriorityQueue delayedTasks; TimeBasedTaskScheduler( SystemNanoClock clock, ThreadPoolManager pools ) { this.clock = clock; this.pools = pools; inbox = new AtomicReference<>( END_SENTINEL ); + delayedTasks = new PriorityQueue<>( DEADLINE_COMPARATOR ); } public JobHandle submit( Group group, Runnable job, long initialDelayNanos, long reschedulingDelayNanos ) @@ -87,47 +92,31 @@ private void sortInbox() private void enqueueTask( ScheduledJobHandle newTasks ) { - if ( delayedTasks == null || newTasks.nextDeadlineNanos <= delayedTasks.nextDeadlineNanos ) - { - newTasks.next = delayedTasks; - delayedTasks = newTasks; - } - else - { - ScheduledJobHandle head = delayedTasks; - while ( head.next != null && head.next.nextDeadlineNanos < newTasks.nextDeadlineNanos ) - { - head = head.next; - } - newTasks.next = head.next; - head.next = newTasks; - } + newTasks.next = null; // Assigning null helps prevent GC nepotism. + delayedTasks.offer( newTasks ); } private long scheduleDueTasks( long now ) { - if ( delayedTasks == null ) + if ( delayedTasks.isEmpty() ) { // We have no tasks to run. Park until we're woken up by a submit(). return NO_TASKS_PARK; } ScheduledJobHandle due = spliceOutDueTasks( now ); submitAndEnqueueTasks( due, now ); - return delayedTasks == null ? NO_TASKS_PARK : delayedTasks.nextDeadlineNanos - now; + return delayedTasks.isEmpty() ? NO_TASKS_PARK : delayedTasks.peek().nextDeadlineNanos - now; } private ScheduledJobHandle spliceOutDueTasks( long now ) { ScheduledJobHandle due = null; - ScheduledJobHandle delayed = delayedTasks; - while ( delayed != null && delayed.nextDeadlineNanos <= now ) + while ( !delayedTasks.isEmpty() && delayedTasks.peek().nextDeadlineNanos <= now ) { - ScheduledJobHandle next = delayed.next; - delayed.next = due; - due = delayed; - delayed = next; + ScheduledJobHandle task = delayedTasks.poll(); + task.next = due; + due = task; } - delayedTasks = delayed; return due; } @@ -135,6 +124,7 @@ private void submitAndEnqueueTasks( ScheduledJobHandle due, long now ) { while ( due != null ) { + // Make sure to grab the 'next' reference before any call to enqueueTask. ScheduledJobHandle next = due.next; if ( due.compareAndSetState( ScheduledJobHandle.STATE_RUNNABLE, ScheduledJobHandle.STATE_SUBMITTED ) ) { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java index c878fb6b1b91f..86298459bfedc 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java @@ -104,6 +104,23 @@ public void mustDelayExecution() throws Exception assertThat( counter.get(), is( 1 ) ); } + @Test + public void mustOnlyScheduleTasksThatAreDue() throws Exception + { + JobHandle handle1 = scheduler.submit( group, () -> counter.addAndGet( 10 ), 100, 0 ); + JobHandle handle2 = scheduler.submit( group, () -> counter.addAndGet( 100 ), 200, 0 ); + scheduler.tick(); + assertThat( counter.get(), is( 0 ) ); + clock.forward( 199, TimeUnit.NANOSECONDS ); + scheduler.tick(); + handle1.waitTermination(); + assertThat( counter.get(), is( 10 ) ); + clock.forward( 1, TimeUnit.NANOSECONDS ); + scheduler.tick(); + handle2.waitTermination(); + assertThat( counter.get(), is( 110 ) ); + } + @Test public void mustNotRescheduleDelayedTasks() throws Exception {