Skip to content

Commit

Permalink
Ensure cancelled scheduled tasks can be GC'ed ASAP
Browse files Browse the repository at this point in the history
Motivation:

Prior we used a purge task that would remove previous canceled scheduled tasks from the internal queue. This could introduce some delay and so use a lot of memory even if the task itself is already canceled.

Modifications:

Schedule removal of task from queue via EventLoop if cancel operation is not done in the EventLoop Thread or just remove directly if the Thread that cancels the scheduled task is in the EventLoop.

Result:

Faster possibility to GC a canceled ScheduledFutureTask.
  • Loading branch information
normanmaurer committed Jul 19, 2015
1 parent ae32fd4 commit 36c80cd
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 39 deletions.
Expand Up @@ -16,8 +16,8 @@
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.OneTimeTask;

import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -62,7 +62,7 @@ protected void cancelScheduledTasks() {
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);

for (ScheduledFutureTask<?> task: scheduledTasks) {
task.cancel(false);
task.cancelWithoutRemove(false);
}

scheduledTaskQueue.clear();
Expand Down Expand Up @@ -188,7 +188,7 @@ <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
execute(new OneTimeTask() {
@Override
public void run() {
scheduledTaskQueue().add(task);
Expand All @@ -199,17 +199,16 @@ public void run() {
return task;
}

void purgeCancelledScheduledTasks() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}
Iterator<ScheduledFutureTask<?>> i = scheduledTaskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
final void removeScheduled(final ScheduledFutureTask<?> task) {
if (inEventLoop()) {
scheduledTaskQueue().remove(task);
} else {
execute(new OneTimeTask() {
@Override
public void run() {
removeScheduled(task);
}
});
}
}
}
Expand Up @@ -36,14 +36,18 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);

private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);

public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();

final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new Runnable() {
@Override
public void run() {
// NOOP
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);

private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
private final TaskRunner taskRunner = new TaskRunner();
Expand All @@ -53,7 +57,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());

private GlobalEventExecutor() {
scheduledTaskQueue().add(purgeTask);
scheduledTaskQueue().add(quietPeriodTask);
}

@Override
Expand Down Expand Up @@ -231,13 +235,13 @@ public void run() {
logger.warn("Unexpected exception from the global event executor: ", t);
}

if (task != purgeTask) {
if (task != quietPeriodTask) {
continue;
}
}

Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
// Terminate if there is no task in the queue (except the purge task).
// Terminate if there is no task in the queue (except the noop task).
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// Mark the current thread as stopped.
// The following CAS must always success and must be uncontended,
Expand Down Expand Up @@ -268,11 +272,4 @@ public void run() {
}
}
}

private final class PurgeTask implements Runnable {
@Override
public void run() {
purgeCancelledScheduledTasks();
}
}
}
Expand Up @@ -146,6 +146,19 @@ public void run() {
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
}
return canceled;
}

boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}

@Override
protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder();
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -714,18 +713,8 @@ protected static void reject() {
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start();
}
}
}

private final class PurgeTask implements Runnable {
@Override
public void run() {
purgeCancelledScheduledTasks();
}
}
}

0 comments on commit 36c80cd

Please sign in to comment.