Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FutureMono and use Mono.fromCompletionStage #2250

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ default boolean isDisposed() {
* @return a {@link Mono} terminating with success if shutdown successfully or error
*/
default Mono<Void> onDispose() {
return FutureMono.from(channel().closeFuture());
return Mono.fromCompletionStage(channel().closeFuture().asStage());
}

/**
Expand Down
209 changes: 0 additions & 209 deletions reactor-netty-core/src/main/java/reactor/netty/FutureMono.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import reactor.netty.ChannelOperationsId;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand Down Expand Up @@ -290,7 +289,7 @@ public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<Byt
return then(Mono.error(AbortedException.beforeSend()));
}
if (dataStream instanceof Mono) {
return then(((Mono<?>) dataStream).flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
return then(((Mono<?>) dataStream).flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage()))
.doOnDiscard(ByteBuf.class, ByteBuf::release));
}
return then(MonoSendMany.byteBufSource(dataStream, channel(), predicate));
Expand All @@ -303,7 +302,7 @@ public NettyOutbound sendObject(Publisher<?> dataStream, Predicate<Object> predi
return then(Mono.error(AbortedException.beforeSend()));
}
if (dataStream instanceof Mono) {
return then(((Mono<?>) dataStream).flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
return then(((Mono<?>) dataStream).flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage()))
.doOnDiscard(ReferenceCounted.class, ReferenceCounted::release));
}
return then(MonoSendMany.objectSource(dataStream, channel(), predicate));
Expand All @@ -315,8 +314,8 @@ public NettyOutbound sendObject(Object message) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
return then(FutureMono.deferFuture(() -> connection.channel()
.writeAndFlush(message)),
return then(Mono.fromCompletionStage(() -> connection.channel()
.writeAndFlush(message).asStage()),
() -> ReactorNetty.safeRelease(message));
}

Expand All @@ -330,8 +329,8 @@ public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput,

return then(Mono.using(
sourceInput,
s -> FutureMono.from(connection.channel()
.writeAndFlush(mappedInput.apply(this, s))),
s -> Mono.fromCompletionStage(connection.channel()
.writeAndFlush(mappedInput.apply(this, s)).asStage()),
sourceCleanup)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import io.netty5.channel.MultithreadEventLoopGroup;
import io.netty5.channel.nio.NioHandler;
import io.netty5.util.concurrent.FastThreadLocalThread;
import io.netty5.util.concurrent.Future;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.NonBlocking;
import reactor.netty.FutureMono;

/**
* An adapted global eventLoop handler.
Expand Down Expand Up @@ -79,7 +77,6 @@ final class DefaultLoopResources extends AtomicLong implements LoopResources {
}

@Override
@SuppressWarnings("unchecked")
public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
return Mono.defer(() -> {
long quietPeriodMillis = quietPeriod.toMillis();
Expand All @@ -100,28 +97,28 @@ public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
Mono<?> cnsrvlMono = Mono.empty();
if (running.compareAndSet(true, false)) {
if (clientLoopsGroup != null) {
clMono = FutureMono.from((Future) clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
clMono = Mono.fromCompletionStage(clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
}
if (serverSelectLoopsGroup != null) {
sslMono = FutureMono.from((Future) serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
sslMono = Mono.fromCompletionStage(serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
}
if (serverLoopsGroup != null) {
slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
slMono = Mono.fromCompletionStage(serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
}
if (cacheNativeClientGroup != null) {
cnclMono = FutureMono.from((Future) cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
cnclMono = Mono.fromCompletionStage(cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
}
if (cacheNativeSelectGroup != null) {
cnslMono = FutureMono.from((Future) cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
cnslMono = Mono.fromCompletionStage(cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
}
if (cacheNativeServerGroup != null) {
cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
cnsrvlMono = Mono.fromCompletionStage(cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.transport.TransportConfig;
Expand Down Expand Up @@ -577,7 +576,7 @@ public void onSubscribe(Subscription s) {
if (!pooledConnection.channel.isActive()) {
return Mono.empty();
}
return FutureMono.from(pooledConnection.channel.close());
return Mono.fromCompletionStage(pooledConnection.channel.close().asStage());
};
}
}