Skip to content

Commit

Permalink
Simplify EventLoop abstractions for timed scheduled tasks (#9470)
Browse files Browse the repository at this point in the history
Motivation

The epoll transport was updated in #7834 to decouple setting of the
timerFd from the event loop, so that scheduling delayed tasks does not
require waking up epoll_wait. To achieve this, new overridable hooks
were added in the AbstractScheduledEventExecutor and
SingleThreadEventExecutor superclasses.

However, the minimumDelayScheduledTaskRemoved hook has no current
purpose and I can't envisage a _practical_ need for it. Removing
it would reduce complexity and avoid supporting this specific
API indefinitely. We can add something similar later if needed
but the opposite is not true.

There also isn't a _nice_ way to use the abstractions for
wakeup-avoidance optimizations in other EventLoops that don't have a
decoupled timer.

This PR replaces executeScheduledRunnable and
wakesUpForScheduledRunnable
with two new methods before/afterFutureTaskScheduled that have slightly
different semantics:
 - They only apply to additions; given the current internals there's no
practical use for removals
 - They allow per-submission wakeup decisions via a boolean return val,
which makes them easier to exploit from other existing EL impls (e.g.
NIO/KQueue)
 - They are subjectively "cleaner", taking just the deadline parameter
and not exposing Runnables
 - For current EL/queue impls, only the "after" hook is really needed,
but specialized blocking queue impls can conditionally wake on task
submission (I have one lined up)

Also included are further optimization/simplification/fixes to the
timerFd manipulation logic.

Modifications

- Remove AbstractScheduledEventExecutor#minimumDelayScheduledTaskRemoved()
and supporting methods
- Uplift NonWakeupRunnable and corresponding default wakesUpForTask()
impl from SingleThreadEventLoop to SingleThreadEventExecutor
- Change executeScheduledRunnable() to be package-private, and have a
final impl in SingleThreadEventExecutor which triggers new overridable
hooks before/afterFutureTaskScheduled()
- Remove unnecessary use of bookend tasks while draining the task queue
- Use new hooks to add simpler wake-up avoidance optimization to
NioEventLoop (primarily to demonstrate utility/simplicity)
- Reinstate removed EpollTest class

In EpollEventLoop:
 - Refactor to use only the new afterFutureTaskScheduled() hook for
updating timerFd
 - Fix setTimerFd race condition using a monitor
 - Set nextDeadlineNanos to a negative value while the EL is awake and
use this to block timer changes from outside the EL. Restore the
known-set value prior to sleeping, updating timerFd first if necessary
 - Don't read from timerFd when processing expiry event

Result

- Cleaner API for integrating with different EL/queue timing impls
- Fixed race condition to avoid missing scheduled wakeups
- Eliminate unnecessary timerFd updates while EL is awake, and
unnecessary expired timerFd reads
- Avoid unnecessary scheduled-task wakeups when using NIO transport

I did not yet further explore the suggestion of using
TFD_TIMER_ABSTIME for the timerFd.
  • Loading branch information
njhill authored and normanmaurer committed Aug 21, 2019
1 parent cb739b2 commit a22d4ba
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 229 deletions.
Expand Up @@ -102,10 +102,6 @@ protected void cancelScheduledTasks() {
}

scheduledTaskQueue.clearIgnoringIndexes();

// calling minimumDelayScheduledTaskRemoved was considered here to give a chance for EventLoop
// implementations the opportunity to cleanup any timerFds, but this is currently only called when the EventLoop
// is being shutdown, so the timerFd and associated polling mechanism will be destroyed anyways.
}

/**
Expand All @@ -120,24 +116,15 @@ protected final Runnable pollScheduledTask() {
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? pollScheduledTask(scheduledTaskQueue, nanoTime, true) : null;
}

final Runnable pollScheduledTask(Queue<ScheduledFutureTask<?>> scheduledTaskQueue, long nanoTime,
boolean notifyMinimumDeadlineRemoved) {
assert scheduledTaskQueue != null;
assert inEventLoop();

ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue.peek();
if (scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.poll();
if (notifyMinimumDeadlineRemoved) {
minimumDelayScheduledTaskRemoved(scheduledTask, scheduledTask.deadlineNanos());
}
return scheduledTask;
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
return null;
scheduledTaskQueue.remove();
return scheduledTask;
}

/**
Expand Down Expand Up @@ -269,29 +256,17 @@ public void run() {

final void removeScheduled(final ScheduledFutureTask<?> task) {
if (inEventLoop()) {
removedSchedule0(task);
scheduledTaskQueue().removeTyped(task);
} else {
executeScheduledRunnable(new Runnable() {
@Override
public void run() {
removedSchedule0(task);
scheduledTaskQueue().removeTyped(task);
}
}, false, task.deadlineNanos());
}
}

private void removedSchedule0(final ScheduledFutureTask<?> task) {
if (scheduledTaskQueue == null || task == null) {
return;
}
if (scheduledTaskQueue.peek() == task) {
scheduledTaskQueue.poll();
minimumDelayScheduledTaskRemoved(task, task.deadlineNanos());
} else {
scheduledTaskQueue.removeTyped(task);
}
}

/**
* Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing
* a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this
Expand All @@ -301,16 +276,9 @@ private void removedSchedule0(final ScheduledFutureTask<?> task) {
* action
* @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed.
*/
protected void executeScheduledRunnable(Runnable runnable,
void executeScheduledRunnable(Runnable runnable,
@SuppressWarnings("unused") boolean isAddition,
@SuppressWarnings("unused") long deadlineNanos) {
execute(runnable);
}

/**
* The next task to expire (e.g. minimum delay) has been removed from the scheduled priority queue.
*/
protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task,
@SuppressWarnings("unused") long deadlineNanos) {
}
}
Expand Up @@ -73,12 +73,6 @@ public void run() {
// Do nothing.
}
};
private static final Runnable BOOKEND_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};

private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
Expand Down Expand Up @@ -184,6 +178,33 @@ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

/**
* Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
* Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
* process the scheduled task (if not already awake).
* <p>
* If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
* the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
* relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return true;
}

/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return true;
}

/**
* @deprecated Please use and override {@link #newTaskQueue(int)}.
*/
Expand Down Expand Up @@ -225,10 +246,9 @@ protected Runnable pollTask() {
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
if (task != WAKEUP_TASK) {
return task;
}
return task;
}
}

Expand Down Expand Up @@ -293,35 +313,35 @@ private boolean fetchFromScheduledTaskQueue() {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, true);
while (scheduledTask != null) {
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false);
}
return true;
}

/**
* @return {@code true} if at least one scheduled task was executed.
*/
private boolean executeExpiredScheduledTasks(boolean notifyMinimumDeadlineRemoved) {
private boolean executeExpiredScheduledTasks() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, notifyMinimumDeadlineRemoved);
if (scheduledTask != null) {
do {
safeExecute(scheduledTask);
scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false);
} while (scheduledTask != null);
return true;
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return false;
}
return false;
do {
safeExecute(scheduledTask);
} while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
return true;
}

/**
Expand Down Expand Up @@ -414,17 +434,13 @@ protected boolean runAllTasks() {
*/
protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
assert inEventLoop();
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
boolean ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue);
boolean ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(true);
boolean ranAtLeastOneTask;
int drainAttempt = 0;
while ((ranAtLeastOneExecutorTask || ranAtLeastOneScheduledTask) && ++drainAttempt < maxDrainAttempts) {
do {
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue);
ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(false);
}
ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
} while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);

if (drainAttempt > 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
Expand Down Expand Up @@ -461,46 +477,17 @@ protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
* @return {@code true} if at least {@link Runnable#run()} was called.
*/
private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
return taskQueue.offer(BOOKEND_TASK) ? runExistingTasksUntilBookend(taskQueue)
: runExistingTasksUntilMaxTasks(taskQueue);
}

private boolean runExistingTasksUntilBookend(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
// null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, and
// null elements are not permitted to be inserted into the queue.
if (task == BOOKEND_TASK) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue,
// and null elements are not permitted to be inserted into the queue.
if (task == BOOKEND_TASK) {
return true;
}
}
}

private boolean runExistingTasksUntilMaxTasks(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
// BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted into
// the queue, and if was previously inserted we always drain all the elements from queue including BOOKEND_TASK.
if (task == null) {
return false;
}
int i = 0;
do {
int remaining = Math.min(maxPendingTasks, taskQueue.size());
safeExecute(task);
// Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
// silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted
// into the queue, and if was previously inserted we always drain all the elements from queue including
// BOOKEND_TASK.
if (task == null) {
return true;
}
} while (++i < maxPendingTasks);
}
return true;
}

Expand Down Expand Up @@ -607,6 +594,25 @@ protected void wakeup(boolean inEventLoop) {
}
}

@Override
final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) {
// Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false
if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) {
super.executeScheduledRunnable(runnable, isAddition, deadlineNanos);
} else {
super.executeScheduledRunnable(new NonWakeupRunnable() {
@Override
public void run() {
runnable.run();
}
}, isAddition, deadlineNanos);
// Second hook after scheduling to facilitate race-avoidance
if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) {
wakeup(false);
}
}
}

@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
Expand Down Expand Up @@ -954,8 +960,21 @@ public final ThreadProperties threadProperties() {
return threadProperties;
}

protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) {
return true;
/**
* Marker interface for {@link Runnable} to indicate that it should be queued for execution
* but does not need to run immediately. The default implementation of
* {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up
* the {@link EventExecutor} thread when not necessary.
*/
protected interface NonWakeupRunnable extends Runnable { }

/**
* Can be overridden to control which tasks require waking the {@link EventExecutor} thread
* if it is waiting so that they can be run immediately. The default implementation
* decides based on whether the task implements {@link NonWakeupRunnable}.
*/
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
}

protected static void reject() {
Expand Down

0 comments on commit a22d4ba

Please sign in to comment.