Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use FutureMono instead of Mono#fromCompletionStage #2565

Merged
merged 1 commit into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ default boolean isDisposed() {
* @return a {@link Mono} terminating with success if shutdown successfully or error
*/
default Mono<Void> onDispose() {
return Mono.fromCompletionStage(channel().closeFuture().asStage());
return FutureMono.from(channel().closeFuture());
}

/**
Expand Down
204 changes: 204 additions & 0 deletions reactor-netty5-core/src/main/java/reactor/netty5/FutureMono.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5;

import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.function.Supplier;

import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.netty5.channel.AbortedException;
import reactor.util.context.Context;

/**
* Convert Netty Future into void {@link Mono}.
*
* @author Stephane Maldini
*/
public abstract class FutureMono extends Mono<Void> {

/**
* Convert a {@link Future} into {@link Mono}. {@link Mono#subscribe(Subscriber)}
* will bridge to {@link Future#addListener(FutureListener)}.
*
* @param future the future to convert from
* @param <F> the future type
*
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> from(F future) {
Objects.requireNonNull(future, "future");
if (future.isDone()) {
if (!future.isSuccess()) {
return Mono.error(FutureSubscription.wrapError(future.cause()));
}
return Mono.empty();
}
return new ImmediateFutureMono<>(future);
}

/**
* Convert a supplied {@link Future} for each subscriber into {@link Mono}.
* {@link Mono#subscribe(Subscriber)}
* will bridge to {@link Future#addListener(FutureListener)}.
*
* @param deferredFuture the future to evaluate and convert from
* @param <F> the future type
*
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferredFuture) {
return new DeferredFutureMono<>(deferredFuture);
}

final static class ImmediateFutureMono<F extends Future<Void>> extends FutureMono {

final F future;

ImmediateFutureMono(F future) {
this.future = Objects.requireNonNull(future, "future");
}

@Override
public void subscribe(final CoreSubscriber<? super Void> s) {
doSubscribe(s, future);
}
}

final static class DeferredFutureMono<F extends Future<Void>> extends FutureMono {

final Supplier<F> deferredFuture;

DeferredFutureMono(Supplier<F> deferredFuture) {
this.deferredFuture =
Objects.requireNonNull(deferredFuture, "deferredFuture");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
F f;
try {
f = deferredFuture.get();
}
catch (Throwable t) {
Operators.error(s, t);
return;
}

if (f == null) {
Operators.error(s,
Operators.onOperatorError(new NullPointerException(
"Deferred supplied null"), s.currentContext()));
return;
}

doSubscribe(s, f);
}
}

static <F extends Future<Void>> void doSubscribe(CoreSubscriber<? super Void> s, F future) {
if (future.isDone()) {
if (future.isSuccess()) {
Operators.complete(s);
}
else {
Operators.error(s, FutureSubscription.wrapError(future.cause()));
}
return;
}

FutureSubscription<F> fs = new FutureSubscription<>(future, s);
// propagate subscription before adding listener to avoid any race between finishing future and onSubscribe
// is called
s.onSubscribe(fs);

// check if subscription was not cancelled immediately.
if (fs.cancelled) {
// if so do nothing anymore
return;
}

// add listener to the future to propagate on complete when future is done
// addListener likely to be thread safe method
future.addListener(fs);

// check once again if is cancelled to see if we need to removeListener in case addListener racing with
// subscription.cancel (which should remove listener)
if (fs.cancelled) {
// future.removeListener(fs); TODO
}
Comment on lines +144 to +148
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you leave the line 147 commented on purpose ?
I ask this because apparently, listeners can't be removed anymore from Future in netty5. So is there something else that still needs to be implemented in case the subscription is cancelled ? If not, then can we remove the lines between L144-L148 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the moment we will keep these TODOs until we are sure that we don't need them. This will happen post M3 release.

}

final static class FutureSubscription<F extends Future<Void>>
implements FutureListener<Void>, Subscription, Supplier<Context> {

final CoreSubscriber<? super Void> s;

final F future;

boolean cancelled;

FutureSubscription(F future, CoreSubscriber<? super Void> s) {
this.s = s;
this.future = future;
}

@Override
public void request(long n) {
//noop
}

@Override
public Context get() {
return s.currentContext();
}

@Override
public void cancel() {
// cancel is not thread safe since we assume that removeListener is thread-safe. That said if we have
// concurrent addListener and removeListener and if addListener is after removeListener, the other Thread
// after execution addListener should see changes happened before removeListener. Thus, it should see
// cancelled flag set to true and should cleanup added handler
this.cancelled = true;
// future.removeListener(this); TODO
}

@Override
public void operationComplete(Future<? extends Void> future) {
if (!future.isSuccess()) {
s.onError(wrapError(future.cause()));
}
else {
s.onComplete();
}
}

private static Throwable wrapError(Throwable error) {
if (error instanceof ClosedChannelException) {
return new AbortedException(error);
}
else {
return error;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import reactor.netty5.ChannelOperationsId;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.FutureMono;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import reactor.netty5.NettyPipeline;
Expand Down Expand Up @@ -287,8 +288,8 @@ public NettyOutbound send(Publisher<? extends Buffer> dataStream, Predicate<Buff
return then(Mono.error(AbortedException.beforeSend()));
}
if (dataStream instanceof Mono<?> mono) {
return then(mono.flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage()))
.doOnDiscard(Buffer.class, Buffer::close));
return then(mono.flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
.doOnDiscard(Buffer.class, Buffer::close));
}
return then(MonoSendMany.bufferSource(dataStream, channel(), predicate));
}
Expand All @@ -300,8 +301,8 @@ public NettyOutbound sendObject(Publisher<?> dataStream, Predicate<Object> predi
return then(Mono.error(AbortedException.beforeSend()));
}
if (dataStream instanceof Mono<?> mono) {
return then(mono.flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage()))
.doOnDiscard(Resource.class, Resource::dispose));
return then(mono.flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
.doOnDiscard(Resource.class, Resource::dispose));
}
return then(MonoSendMany.objectSource(dataStream, channel(), predicate));
}
Expand All @@ -312,8 +313,8 @@ public NettyOutbound sendObject(Object message) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
return then(Mono.fromCompletionStage(() -> connection.channel()
.writeAndFlush(message).asStage()),
return then(FutureMono.deferFuture(() -> connection.channel()
.writeAndFlush(message)),
() -> ReactorNetty.safeRelease(message));
}

Expand All @@ -327,8 +328,8 @@ public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput,

return then(Mono.using(
sourceInput,
s -> Mono.fromCompletionStage(connection.channel()
.writeAndFlush(mappedInput.apply(this, s)).asStage()),
s -> FutureMono.from(connection.channel()
.writeAndFlush(mappedInput.apply(this, s))),
sourceCleanup)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty5.util.concurrent.FastThreadLocalThread;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.NonBlocking;
import reactor.netty5.FutureMono;

/**
* An adapted global eventLoop handler.
Expand Down Expand Up @@ -97,28 +98,28 @@ public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
Mono<?> cnsrvlMono = Mono.empty();
if (running.compareAndSet(true, false)) {
if (clientLoopsGroup != null) {
clMono = Mono.fromCompletionStage(clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
clMono = FutureMono.from(clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (serverSelectLoopsGroup != null) {
sslMono = Mono.fromCompletionStage(serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
sslMono = FutureMono.from(serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (serverLoopsGroup != null) {
slMono = Mono.fromCompletionStage(serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
slMono = FutureMono.from(serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (cacheNativeClientGroup != null) {
cnclMono = Mono.fromCompletionStage(cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
cnclMono = FutureMono.from(cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (cacheNativeSelectGroup != null) {
cnslMono = Mono.fromCompletionStage(cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
cnslMono = FutureMono.from(cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (cacheNativeServerGroup != null) {
cnsrvlMono = Mono.fromCompletionStage(cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
cnsrvlMono = FutureMono.from(cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import reactor.core.publisher.Sinks;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.FutureMono;
import reactor.netty5.NettyPipeline;
import reactor.netty5.channel.ChannelOperations;
import reactor.netty5.transport.TransportConfig;
Expand Down Expand Up @@ -565,7 +566,7 @@ public void onSubscribe(Subscription s) {
if (!pooledConnection.channel.isActive()) {
return Mono.empty();
}
return Mono.fromCompletionStage(pooledConnection.channel.close().asStage());
return FutureMono.from(pooledConnection.channel.close());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import reactor.core.publisher.Mono;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.FutureMono;
import reactor.netty5.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -66,12 +67,12 @@ public Mono<Void> join(final InetAddress multicastAddress, @Nullable NetworkInte
future = datagramChannel.joinGroup(multicastAddress);
}

return Mono.fromCompletionStage(future.asStage())
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
return FutureMono.from(future)
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
}

/**
Expand All @@ -98,12 +99,12 @@ public Mono<Void> leave(final InetAddress multicastAddress, @Nullable NetworkInt
future = datagramChannel.leaveGroup(multicastAddress);
}

return Mono.fromCompletionStage(future.asStage())
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
return FutureMono.from(future)
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
}

static final Logger log = Loggers.getLogger(UdpOperations.class);
Expand Down