diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 6fc25f10202..be665b0e24c 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -17,9 +17,9 @@ import io.netty.util.internal.CallableEventExecutorAdapter; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.RunnableEventExecutorAdapter; -import java.util.Iterator; import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.Callable; @@ -71,7 +71,7 @@ protected void cancelScheduledTasks() { scheduledTaskQueue.toArray(new ScheduledFutureTask[scheduledTaskQueue.size()]); for (ScheduledFutureTask task: scheduledTasks) { - task.cancel(false); + task.cancelWithoutRemove(false); } scheduledTaskQueue.clear(); @@ -197,7 +197,7 @@ ScheduledFuture schedule(final ScheduledFutureTask task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { - execute(new Runnable() { + execute(new OneTimeTask() { @Override public void run() { scheduledTaskQueue().add(task); @@ -208,17 +208,16 @@ public void run() { return task; } - void purgeCancelledScheduledTasks() { - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - if (isNullOrEmpty(scheduledTaskQueue)) { - return; - } - Iterator> i = scheduledTaskQueue.iterator(); - while (i.hasNext()) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } + final void removeScheduled(final ScheduledFutureTask task) { + if (inEventLoop()) { + scheduledTaskQueue().remove(task); + } else { + execute(new OneTimeTask() { + @Override + public void run() { + removeScheduled(task); + } + }); } } diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index 0c52d530efa..aade9e705f2 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -36,14 +36,18 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1); public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); final BlockingQueue taskQueue = new LinkedBlockingQueue(); - final ScheduledFutureTask purgeTask = new ScheduledFutureTask( - this, Executors.callable(new PurgeTask(), null), - ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL); + final ScheduledFutureTask quietPeriodTask = new ScheduledFutureTask( + this, Executors.callable(new Runnable() { + @Override + public void run() { + // NOOP + } + }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); private final TaskRunner taskRunner = new TaskRunner(); @@ -53,7 +57,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { private final Future terminationFuture = new FailedFuture(this, new UnsupportedOperationException()); private GlobalEventExecutor() { - scheduledTaskQueue().add(purgeTask); + scheduledTaskQueue().add(quietPeriodTask); } /** @@ -227,13 +231,13 @@ public void run() { logger.warn("Unexpected exception from the global event executor: ", t); } - if (task != purgeTask) { + if (task != quietPeriodTask) { continue; } } Queue> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue; - // Terminate if there is no task in the queue (except the purge task). + // Terminate if there is no task in the queue (except the noop task). if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { // Mark the current thread as stopped. // The following CAS must always success and must be uncontended, @@ -264,11 +268,4 @@ public void run() { } } } - - private final class PurgeTask implements Runnable { - @Override - public void run() { - purgeCancelledScheduledTasks(); - } - } } diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index 72c2c79b1f2..91f00d13574 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -153,6 +153,19 @@ public void run() { } } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean canceled = super.cancel(mayInterruptIfRunning); + if (canceled) { + ((AbstractScheduledEventExecutor) executor()).removeScheduled(this); + } + return canceled; + } + + boolean cancelWithoutRemove(boolean mayInterruptIfRunning) { + return super.cancel(mayInterruptIfRunning); + } + @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index fe07a91ac7d..cdc577f6d71 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; @@ -726,9 +725,6 @@ protected void cleanupAndTerminate(boolean success) { private void startExecution() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { - schedule(new ScheduledFutureTask( - this, Executors.callable(new PurgeTask(), null), - ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); scheduleExecution(); } } @@ -742,11 +738,4 @@ protected final void scheduleExecution() { private void updateThread(Thread t) { THREAD_UPDATER.lazySet(this, t); } - - private final class PurgeTask implements Runnable { - @Override - public void run() { - purgeCancelledScheduledTasks(); - } - } }