Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netty Future no longer extends JDK Future #11647

Merged
merged 8 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private Future<Void> close0(final ChannelOutboundInvoker invoker, final Channel
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);

channel.closeFuture().addListener(ignore -> {
forceCloseFuture.cancel(false);
forceCloseFuture.cancel();
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void applyHandshakeTimeout() {
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false));
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
}
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);

closeSent.asFuture().addListener(future -> timeoutTask.cancel(false));
closeSent.asFuture().addListener(future -> timeoutTask.cancel());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,6 @@ private void applyHandshakeTimeout(ChannelHandlerContext ctx) {
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false));
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -900,7 +899,7 @@ private static final class ClosingChannelFutureListener implements FutureListene
@Override
public void operationComplete(Future<?> sentGoAwayFuture) {
if (timeoutTask != null) {
timeoutTask.cancel(false);
timeoutTask.cancel();
}
doClose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import static java.util.Objects.requireNonNull;

/**
* Abstract base class for {@link EventExecutor} implementations.
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
public abstract class AbstractEventExecutor implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
Expand All @@ -43,26 +44,60 @@ public <V> Future<V> newSucceededFuture(V result) {
}

@Override
public final Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
public final Future<Void> submit(Runnable task) {
var futureTask = newTaskFor(task, (Void) null);
execute(futureTask);
return futureTask;
}

@Override
public final <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
var futureTask = newTaskFor(task, result);
execute(futureTask);
return futureTask;
}

@Override
public final <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
var futureTask = newTaskFor(task);
execute(futureTask);
return futureTask;
}

@Override
/**
* Decorate the given {@link Runnable} and its return value, as a {@link RunnableFuture}, such that the
* returned {@link RunnableFuture} completes with the given result at the end of executing its
* {@link RunnableFuture#run()} method.
* <p>
* The returned {@link RunnableFuture} is the task that will actually be run by a thread in this
* executor.
* <p>
* This method can be overridden by sub-classes to hook into the life cycle of the given task.
*
* @param runnable The task to be decorated.
* @param value The value that the returned future will complete with, assuming the given {@link Runnable} doesn't
* throw an exception.
* @param <T> The type of the result value.
* @return The decorated {@link Runnable} that is now also a {@link Future}.
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return newRunnableFuture(newPromise(), runnable, value);
}

@Override
chrisvest marked this conversation as resolved.
Show resolved Hide resolved
/**
* Decorate the given {@link Callable} and its return value, as a {@link RunnableFuture}, such that the
* returned {@link RunnableFuture} completes with the returned result from the {@link Callable} at the end of
* executing its {@link RunnableFuture#run()} method.
* <p>
* The returned {@link RunnableFuture} is the task that will actually be run by a thread in this
* executor.
* <p>
* This method can be overridden by sub-classes to hook into the life cycle of the given task.
*
* @param callable The task to be decorated.
* @param <T> The type of the result value.
* @return The decorated {@link Runnable} that is now also a {@link Future}.
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return newRunnableFuture(newPromise(), callable);
}
Expand All @@ -85,17 +120,14 @@ static void safeExecute(Runnable task) {
* {@link RunnableFuture}.
*/
private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Callable<V> task) {
return new RunnableFutureAdapter<>(promise, task);
return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task"));
}

/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and
* {@code value}.
*
* This can be used if you want to override {@link #newTaskFor(Runnable, V)} and return a different
* {@link RunnableFuture}.
*/
private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Runnable task, V value) {
return new RunnableFutureAdapter<>(promise, Executors.callable(task, value));
return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
*/
package io.netty.util.concurrent;

import static java.util.Objects.requireNonNull;

import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.callable;

/**
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
Expand Down Expand Up @@ -60,7 +59,7 @@ public static long nanoTime() {
static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos;
}

PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() {
Expand All @@ -79,7 +78,7 @@ private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue

/**
* Cancel all scheduled tasks.
*
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final void cancelScheduledTasks() {
Expand All @@ -92,8 +91,8 @@ protected final void cancelScheduledTasks() {
final RunnableScheduledFutureNode<?>[] scheduledTasks =
scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES);

for (RunnableScheduledFutureNode<?> task: scheduledTasks) {
task.cancel(false);
for (RunnableScheduledFutureNode<?> task : scheduledTasks) {
task.cancel();
}

scheduledTaskQueue.clearIgnoringIndexes();
Expand All @@ -107,16 +106,16 @@ protected final RunnableScheduledFuture<?> pollScheduledTask() {
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link
* #nanoTime()} to retrieve the correct {@code nanoTime}.
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
Expand All @@ -130,12 +129,12 @@ protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {

/**
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final long nextScheduledTaskNano() {
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return -1;
}
Expand All @@ -152,30 +151,30 @@ final RunnableScheduledFuture<?> peekScheduledTask() {

/**
* Returns {@code true} if a scheduled task is ready for processing.
*
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final boolean hasScheduledTasks() {
assert inEventLoop();
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
RunnableScheduledFuture<?> task = newScheduledTaskFor(Executors.callable(command),
deadlineNanos(unit.toNanos(delay)), 0);
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(delay)), 0);
return schedule(task);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
requireNonNull(callable, "callable");
requireNonNull(unit, "unit");
if (delay < 0) {
Expand All @@ -186,7 +185,7 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
Expand All @@ -198,13 +197,13 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
String.format("period: %d (expected: > 0)", period));
}

RunnableScheduledFuture<?> task = newScheduledTaskFor(Executors.<Void>callable(command, null),
deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period));
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period));
return schedule(task);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
Expand All @@ -216,15 +215,15 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
String.format("delay: %d (expected: > 0)", delay));
}

RunnableScheduledFuture<?> task = newScheduledTaskFor(Executors.<Void>callable(command, null),
deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay));
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay));
return schedule(task);
}

/**
* Add the {@link RunnableScheduledFuture} for execution.
*/
protected final <V> ScheduledFuture<V> schedule(final RunnableScheduledFuture<V> task) {
protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) {
if (inEventLoop()) {
add0(task);
} else {
Expand Down Expand Up @@ -253,9 +252,9 @@ final void removeScheduled(final RunnableScheduledFutureNode<?> task) {

/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}.
*
* This can be used if you want to override {@link #newTaskFor(Callable)} and return a different
* {@link RunnableFuture}.
* <p>
* This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a
* different {@link RunnableFuture}.
*/
protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture(
AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> task,
Expand All @@ -271,7 +270,8 @@ protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(
return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period);
}

interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> { }
interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> {
}

private static final class DefaultRunnableScheduledFutureNode<V> implements RunnableScheduledFutureNode<V> {
private final RunnableScheduledFuture<V> future;
Expand Down Expand Up @@ -308,8 +308,8 @@ public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener
}

@Override
public <C> RunnableScheduledFuture<V> addListener(C context,
FutureContextListener<? super C, ? super V> listener) {
public <C> RunnableScheduledFuture<V> addListener(
C context, FutureContextListener<? super C, ? super V> listener) {
future.addListener(context, listener);
return this;
}
Expand All @@ -335,8 +335,8 @@ public void run() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
public boolean cancel() {
return future.cancel();
}

@Override
Expand All @@ -360,12 +360,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution
}

@Override
public long getDelay(TimeUnit unit) {
return future.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
public int compareTo(RunnableScheduledFuture<?> o) {
return future.compareTo(o);
}

Expand Down Expand Up @@ -393,11 +388,6 @@ public RunnableFuture<V> awaitUninterruptibly() {
return this;
}

@Override
public boolean cancel() {
return cancel(false);
}

@Override
public boolean isSuccess() {
return future.isSuccess();
Expand Down