From c676d86cfbecfefaaa8ab0380a35b30c84f86d10 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 22 Mar 2022 14:22:59 +0200 Subject: [PATCH] Remove FutureMono and use Mono.fromCompletionStage Related to #1873 --- .../java/reactor/netty/DisposableChannel.java | 2 +- .../main/java/reactor/netty/FutureMono.java | 209 ----------------- .../netty/channel/ChannelOperations.java | 13 +- .../netty/resources/DefaultLoopResources.java | 27 +-- .../DefaultPooledConnectionProvider.java | 3 +- .../java/reactor/netty/udp/UdpOperations.java | 31 ++- .../java/reactor/netty/FutureMonoTest.java | 222 ------------------ .../java/reactor/netty/NettyOutboundTest.java | 4 +- .../reactor/netty/tcp/TcpServerTests.java | 9 +- .../reactor/netty/http/HttpOperations.java | 23 +- .../http/client/HttpClientOperations.java | 10 +- .../client/WebsocketClientOperations.java | 8 +- .../http/server/HttpServerOperations.java | 23 +- .../server/WebsocketServerOperations.java | 8 +- .../netty/http/client/HttpClientTest.java | 3 +- .../netty/http/server/HttpServerTests.java | 3 +- 16 files changed, 79 insertions(+), 519 deletions(-) delete mode 100644 reactor-netty-core/src/main/java/reactor/netty/FutureMono.java delete mode 100644 reactor-netty-core/src/test/java/reactor/netty/FutureMonoTest.java diff --git a/reactor-netty-core/src/main/java/reactor/netty/DisposableChannel.java b/reactor-netty-core/src/main/java/reactor/netty/DisposableChannel.java index ae0df54d4e..0a26530cac 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/DisposableChannel.java +++ b/reactor-netty-core/src/main/java/reactor/netty/DisposableChannel.java @@ -125,7 +125,7 @@ default boolean isDisposed() { * @return a {@link Mono} terminating with success if shutdown successfully or error */ default Mono onDispose() { - return FutureMono.from(channel().closeFuture()); + return Mono.fromCompletionStage(channel().closeFuture().asStage()); } /** diff --git a/reactor-netty-core/src/main/java/reactor/netty/FutureMono.java b/reactor-netty-core/src/main/java/reactor/netty/FutureMono.java deleted file mode 100644 index 0ad90381f8..0000000000 --- a/reactor-netty-core/src/main/java/reactor/netty/FutureMono.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty; - -import java.nio.channels.ClosedChannelException; -import java.util.Objects; -import java.util.function.Supplier; - -import io.netty5.util.concurrent.Future; -import io.netty5.util.concurrent.GenericFutureListener; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; -import reactor.netty.channel.AbortedException; -import reactor.util.context.Context; - -/** - * Convert Netty Future into void {@link Mono}. - * - * @author Stephane Maldini - */ -public abstract class FutureMono extends Mono { - - /** - * Convert a {@link Future} into {@link Mono}. {@link Mono#subscribe(Subscriber)} - * will bridge to {@link Future#addListener(GenericFutureListener)}. - * - * @param future the future to convert from - * @param the future type - * - * @return A {@link Mono} forwarding {@link Future} success or failure - */ - public static > Mono from(F future) { - Objects.requireNonNull(future, "future"); - if (future.isDone()) { - if (!future.isSuccess()) { - return Mono.error(FutureSubscription.wrapError(future.cause())); - } - return Mono.empty(); - } - return new ImmediateFutureMono<>(future); - } - - /** - * Convert a supplied {@link Future} for each subscriber into {@link Mono}. - * {@link Mono#subscribe(Subscriber)} - * will bridge to {@link Future#addListener(GenericFutureListener)}. - * - * @param deferredFuture the future to evaluate and convert from - * @param the future type - * - * @return A {@link Mono} forwarding {@link Future} success or failure - */ - public static > Mono deferFuture(Supplier deferredFuture) { - return new DeferredFutureMono<>(deferredFuture); - } - - final static class ImmediateFutureMono> extends FutureMono { - - final F future; - - ImmediateFutureMono(F future) { - this.future = Objects.requireNonNull(future, "future"); - } - - @Override - public void subscribe(final CoreSubscriber s) { - doSubscribe(s, future); - } - } - - final static class DeferredFutureMono> extends FutureMono { - - final Supplier deferredFuture; - - DeferredFutureMono(Supplier deferredFuture) { - this.deferredFuture = - Objects.requireNonNull(deferredFuture, "deferredFuture"); - } - - @Override - public void subscribe(CoreSubscriber s) { - F f; - try { - f = deferredFuture.get(); - } - catch (Throwable t) { - Operators.error(s, t); - return; - } - - if (f == null) { - Operators.error(s, - Operators.onOperatorError(new NullPointerException( - "Deferred supplied null"), s.currentContext())); - return; - } - - doSubscribe(s, f); - } - } - - @SuppressWarnings("FutureReturnValueIgnored") - static > void doSubscribe(CoreSubscriber s, F future) { - if (future.isDone()) { - if (future.isSuccess()) { - Operators.complete(s); - } - else { - Operators.error(s, FutureSubscription.wrapError(future.cause())); - } - return; - } - - FutureSubscription fs = new FutureSubscription<>(future, s); - // propagate subscription before adding listener to avoid any race between finishing future and onSubscribe - // is called - s.onSubscribe(fs); - - // check if subscription was not cancelled immediately. - if (fs.cancelled) { - // if so do nothing anymore - return; - } - - // add listener to the future to propagate on complete when future is done - // addListener likely to be thread safe method - // Returned value is deliberately ignored - future.addListener(fs); - - // check once again if is cancelled to see if we need to removeListener in case addListener racing with - // subscription.cancel (which should remove listener) - if (fs.cancelled) { - // Returned value is deliberately ignored - future.removeListener(fs); - } - } - - final static class FutureSubscription> - implements GenericFutureListener, Subscription, Supplier { - - final CoreSubscriber s; - - final F future; - - boolean cancelled; - - FutureSubscription(F future, CoreSubscriber s) { - this.s = s; - this.future = future; - } - - @Override - public void request(long n) { - //noop - } - - @Override - public Context get() { - return s.currentContext(); - } - - @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void cancel() { - // cancel is not thread safe since we assume that removeListener is thread-safe. That said if we have - // concurrent addListener and removeListener and if addListener is after removeListener, the other Thread - // after execution addListener should see changes happened before removeListener. Thus, it should see - // cancelled flag set to true and should cleanup added handler - this.cancelled = true; - // Returned value is deliberately ignored - future.removeListener(this); - } - - @Override - public void operationComplete(F future) { - if (!future.isSuccess()) { - s.onError(wrapError(future.cause())); - } - else { - s.onComplete(); - } - } - - private static Throwable wrapError(Throwable error) { - if (error instanceof ClosedChannelException) { - return new AbortedException(error); - } - else { - return error; - } - } - } -} diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java index aeae69dbb5..3018bf5e36 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java @@ -41,7 +41,6 @@ import reactor.netty.ChannelOperationsId; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.FutureMono; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; @@ -290,7 +289,7 @@ public NettyOutbound send(Publisher dataStream, Predicate) dataStream).flatMap(m -> FutureMono.from(channel().writeAndFlush(m))) + return then(((Mono) dataStream).flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage())) .doOnDiscard(ByteBuf.class, ByteBuf::release)); } return then(MonoSendMany.byteBufSource(dataStream, channel(), predicate)); @@ -303,7 +302,7 @@ public NettyOutbound sendObject(Publisher dataStream, Predicate predi return then(Mono.error(AbortedException.beforeSend())); } if (dataStream instanceof Mono) { - return then(((Mono) dataStream).flatMap(m -> FutureMono.from(channel().writeAndFlush(m))) + return then(((Mono) dataStream).flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage())) .doOnDiscard(ReferenceCounted.class, ReferenceCounted::release)); } return then(MonoSendMany.objectSource(dataStream, channel(), predicate)); @@ -315,8 +314,8 @@ public NettyOutbound sendObject(Object message) { ReactorNetty.safeRelease(message); return then(Mono.error(AbortedException.beforeSend())); } - return then(FutureMono.deferFuture(() -> connection.channel() - .writeAndFlush(message)), + return then(Mono.fromCompletionStage(() -> connection.channel() + .writeAndFlush(message).asStage()), () -> ReactorNetty.safeRelease(message)); } @@ -330,8 +329,8 @@ public NettyOutbound sendUsing(Callable sourceInput, return then(Mono.using( sourceInput, - s -> FutureMono.from(connection.channel() - .writeAndFlush(mappedInput.apply(this, s))), + s -> Mono.fromCompletionStage(connection.channel() + .writeAndFlush(mappedInput.apply(this, s)).asStage()), sourceCleanup) ); } diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java index ec121944f0..5fad4a242f 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java @@ -26,10 +26,8 @@ import io.netty5.channel.MultithreadEventLoopGroup; import io.netty5.channel.nio.NioHandler; import io.netty5.util.concurrent.FastThreadLocalThread; -import io.netty5.util.concurrent.Future; import reactor.core.publisher.Mono; import reactor.core.scheduler.NonBlocking; -import reactor.netty.FutureMono; /** * An adapted global eventLoop handler. @@ -79,7 +77,6 @@ final class DefaultLoopResources extends AtomicLong implements LoopResources { } @Override - @SuppressWarnings("unchecked") public Mono disposeLater(Duration quietPeriod, Duration timeout) { return Mono.defer(() -> { long quietPeriodMillis = quietPeriod.toMillis(); @@ -100,28 +97,28 @@ public Mono disposeLater(Duration quietPeriod, Duration timeout) { Mono cnsrvlMono = Mono.empty(); if (running.compareAndSet(true, false)) { if (clientLoopsGroup != null) { - clMono = FutureMono.from((Future) clientLoopsGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); + clMono = Mono.fromCompletionStage(clientLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); } if (serverSelectLoopsGroup != null) { - sslMono = FutureMono.from((Future) serverSelectLoopsGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); + sslMono = Mono.fromCompletionStage(serverSelectLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); } if (serverLoopsGroup != null) { - slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); + slMono = Mono.fromCompletionStage(serverLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); } if (cacheNativeClientGroup != null) { - cnclMono = FutureMono.from((Future) cacheNativeClientGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); + cnclMono = Mono.fromCompletionStage(cacheNativeClientGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); } if (cacheNativeSelectGroup != null) { - cnslMono = FutureMono.from((Future) cacheNativeSelectGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); + cnslMono = Mono.fromCompletionStage(cacheNativeSelectGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); } if (cacheNativeServerGroup != null) { - cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); + cnsrvlMono = Mono.fromCompletionStage(cacheNativeServerGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java index a0a4a5557f..dd123885bd 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java @@ -41,7 +41,6 @@ import reactor.core.publisher.Sinks; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.FutureMono; import reactor.netty.NettyPipeline; import reactor.netty.channel.ChannelOperations; import reactor.netty.transport.TransportConfig; @@ -577,7 +576,7 @@ public void onSubscribe(Subscription s) { if (!pooledConnection.channel.isActive()) { return Mono.empty(); } - return FutureMono.from(pooledConnection.channel.close()); + return Mono.fromCompletionStage(pooledConnection.channel.close().asStage()); }; } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/udp/UdpOperations.java b/reactor-netty-core/src/main/java/reactor/netty/udp/UdpOperations.java index 7947dc5b9e..7ad2315bba 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/udp/UdpOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/udp/UdpOperations.java @@ -19,13 +19,12 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; -import io.netty5.channel.ChannelFuture; import io.netty5.channel.socket.DatagramChannel; +import io.netty5.util.concurrent.Future; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.FutureMono; import reactor.netty.channel.ChannelOperations; import reactor.util.Logger; import reactor.util.Loggers; @@ -60,7 +59,7 @@ public Mono join(final InetAddress multicastAddress, @Nullable NetworkInte iface = datagramChannel.config().getNetworkInterface(); } - final ChannelFuture future; + final Future future; if (null != iface) { future = datagramChannel.joinGroup(new InetSocketAddress(multicastAddress, datagramChannel.localAddress() @@ -70,12 +69,12 @@ public Mono join(final InetAddress multicastAddress, @Nullable NetworkInte future = datagramChannel.joinGroup(multicastAddress); } - return FutureMono.from(future) - .doOnSuccess(v -> { - if (log.isInfoEnabled()) { - log.info(format(future.channel(), "JOIN {}"), multicastAddress); - } - }); + return Mono.fromCompletionStage(future.asStage()) + .doOnSuccess(v -> { + if (log.isInfoEnabled()) { + log.info(format(datagramChannel, "JOIN {}"), multicastAddress); + } + }); } /** @@ -95,7 +94,7 @@ public Mono leave(final InetAddress multicastAddress, @Nullable NetworkInt iface = datagramChannel.config().getNetworkInterface(); } - final ChannelFuture future; + final Future future; if (null != iface) { future = datagramChannel.leaveGroup(new InetSocketAddress(multicastAddress, datagramChannel.localAddress() @@ -105,12 +104,12 @@ public Mono leave(final InetAddress multicastAddress, @Nullable NetworkInt future = datagramChannel.leaveGroup(multicastAddress); } - return FutureMono.from(future) - .doOnSuccess(v -> { - if (log.isInfoEnabled()) { - log.info(format(future.channel(), "JOIN {}"), multicastAddress); - } - }); + return Mono.fromCompletionStage(future.asStage()) + .doOnSuccess(v -> { + if (log.isInfoEnabled()) { + log.info(format(datagramChannel, "JOIN {}"), multicastAddress); + } + }); } static final Logger log = Loggers.getLogger(UdpOperations.class); diff --git a/reactor-netty-core/src/test/java/reactor/netty/FutureMonoTest.java b/reactor-netty-core/src/test/java/reactor/netty/FutureMonoTest.java deleted file mode 100644 index 39d564153c..0000000000 --- a/reactor-netty-core/src/test/java/reactor/netty/FutureMonoTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty; - -import io.netty5.util.concurrent.DefaultPromise; -import io.netty5.util.concurrent.Future; -import io.netty5.util.concurrent.GenericFutureListener; -import io.netty5.util.concurrent.ImmediateEventExecutor; -import io.netty5.util.concurrent.Promise; -import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Operators; -import reactor.netty.channel.AbortedException; -import reactor.test.StepVerifier; -import reactor.test.util.RaceTestUtils; -import reactor.util.annotation.Nullable; - -import java.lang.reflect.Field; -import java.nio.channels.ClosedChannelException; -import java.time.Duration; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Supplier; - -import static org.assertj.core.api.Assertions.assertThat; - -class FutureMonoTest { - - @Test - void testImmediateFutureMonoImmediate() { - ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - Future promise = eventExecutor.newFailedFuture(new ClosedChannelException()); - - StepVerifier.create(FutureMono.from(promise)) - .expectError(AbortedException.class) - .verify(Duration.ofSeconds(30)); - } - - // return value of setFailure not needed - @SuppressWarnings("FutureReturnValueIgnored") - @Test - void testImmediateFutureMonoLater() { - ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - Promise promise = eventExecutor.newPromise(); - - StepVerifier.create(FutureMono.from(promise)) - .expectSubscription() - .then(() -> promise.setFailure(new ClosedChannelException())) - .expectError(AbortedException.class) - .verify(Duration.ofSeconds(30)); - } - - @Test - void testDeferredFutureMonoImmediate() { - ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - Supplier> promiseSupplier = () -> eventExecutor.newFailedFuture(new ClosedChannelException()); - - StepVerifier.create(FutureMono.deferFuture(promiseSupplier)) - .expectError(AbortedException.class) - .verify(Duration.ofSeconds(30)); - } - - // return value of setFailure not needed - @SuppressWarnings("FutureReturnValueIgnored") - @Test - void testDeferredFutureMonoLater() { - ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - Promise promise = eventExecutor.newPromise(); - Supplier> promiseSupplier = () -> promise; - - StepVerifier.create(FutureMono.deferFuture(promiseSupplier)) - .expectSubscription() - .then(() -> promise.setFailure(new ClosedChannelException())) - .expectError(AbortedException.class) - .verify(Duration.ofSeconds(30)); - } - - @Test - void raceTestImmediateFutureMonoWithSuccess() { - for (int i = 0; i < 1000; i++) { - final TestSubscriber subscriber = new TestSubscriber(); - final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - final Promise promise = eventExecutor.newPromise(); - - RaceTestUtils.race(() -> FutureMono.from(promise) - .subscribe(subscriber), - subscriber::cancel, - () -> promise.setSuccess(null)); - - assertThat(resolveListeners(promise)).isNullOrEmpty(); - assertThat(subscriber.operations).first() - .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); - } - } - - @Test - void raceTestDeferredFutureMonoWithSuccess() { - for (int i = 0; i < 1000; i++) { - final TestSubscriber subscriber = new TestSubscriber(); - final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - final Promise promise = eventExecutor.newPromise(); - final Supplier> promiseSupplier = () -> promise; - - RaceTestUtils.race(() -> FutureMono.deferFuture(promiseSupplier) - .subscribe(subscriber), - subscriber::cancel, - () -> promise.setSuccess(null)); - - assertThat(resolveListeners(promise)).isNullOrEmpty(); - assertThat(subscriber.operations).first() - .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); - } - } - - @Test - void raceTestImmediateFutureMono() { - for (int i = 0; i < 1000; i++) { - final TestSubscriber subscriber = new TestSubscriber(); - final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - final Promise promise = eventExecutor.newPromise(); - - RaceTestUtils.race(() -> FutureMono.from(promise) - .subscribe(subscriber), subscriber::cancel); - - assertThat(resolveListeners(promise)).isNullOrEmpty(); - assertThat(subscriber.operations).first() - .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); - } - } - - @Test - void raceTestDeferredFutureMono() { - for (int i = 0; i < 1000; i++) { - final TestSubscriber subscriber = new TestSubscriber(); - final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; - final Promise promise = eventExecutor.newPromise(); - final Supplier> promiseSupplier = () -> promise; - - RaceTestUtils.race(() -> FutureMono.deferFuture(promiseSupplier) - .subscribe(subscriber), subscriber::cancel); - - assertThat(resolveListeners(promise)).isNullOrEmpty(); - assertThat(subscriber.operations).first() - .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); - } - } - - @Nullable - @SuppressWarnings("unchecked") - static GenericFutureListener>[] resolveListeners(Promise promise) { - try { - final Field listeners = DefaultPromise.class.getDeclaredField("listeners"); - final Class aClass = Class.forName("io.netty5.util.concurrent.DefaultFutureListeners"); - final Field listeners1 = aClass.getDeclaredField("listeners"); - - listeners.setAccessible(true); - listeners1.setAccessible(true); - - final Object objListener = listeners.get(promise); - if (objListener == null) { - return null; - } - return (GenericFutureListener>[]) listeners1.get(objListener); - } - catch (NoSuchFieldException | ClassNotFoundException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - static class TestSubscriber implements CoreSubscriber { - - enum Operation { - ON_ERROR, ON_COMPLETE, ON_SUBSCRIBE - } - - volatile Subscription s; - static final AtomicReferenceFieldUpdater S = - AtomicReferenceFieldUpdater.newUpdater(TestSubscriber.class, Subscription.class, "s"); - - final Queue operations = new ArrayBlockingQueue<>(3); - - @Override - public void onSubscribe(Subscription s) { - Operators.setOnce(S, this, s); - operations.add(Operation.ON_SUBSCRIBE); - } - - @Override - public void onNext(Void unused) { - - } - - @Override - public void onError(Throwable t) { - operations.add(Operation.ON_ERROR); - } - - @Override - public void onComplete() { - operations.add(Operation.ON_COMPLETE); - } - - void cancel() { - Operators.terminate(S, this); - } - } -} diff --git a/reactor-netty-core/src/test/java/reactor/netty/NettyOutboundTest.java b/reactor-netty-core/src/test/java/reactor/netty/NettyOutboundTest.java index 5a0f369401..4c1de68013 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/NettyOutboundTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/NettyOutboundTest.java @@ -337,8 +337,8 @@ static Mono mockSendUsing(Connection c, Callable sourceIn Consumer sourceCleanup) { return Mono.using( sourceInput, - s -> FutureMono.from(c.channel() - .writeAndFlush(mappedInput.apply(c, s))), + s -> Mono.fromCompletionStage(c.channel() + .writeAndFlush(mappedInput.apply(c, s)).asStage()), sourceCleanup ); } diff --git a/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java b/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java index 90941a10c4..3237db9b69 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java +++ b/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java @@ -72,7 +72,6 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; -import reactor.netty.FutureMono; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; @@ -802,8 +801,8 @@ void testChannelGroupClosesAllConnections() throws Exception { boundServer.disposeNow(); - FutureMono.from(group.close()) - .block(Duration.ofSeconds(30)); + Mono.fromCompletionStage(group.close().asStage()) + .block(Duration.ofSeconds(30)); assertThat(latch2.await(5, TimeUnit.SECONDS)).as("latch await").isTrue(); } @@ -844,8 +843,8 @@ else if (newState == ConnectionObserver.State.DISCONNECTING) { assertThat(configured.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - FutureMono.from(group.close()) - .block(Duration.ofSeconds(30)); + Mono.fromCompletionStage(group.close().asStage()) + .block(Duration.ofSeconds(30)); assertThat(disconnected.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java index 1af0f89bc3..65623c233f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java @@ -45,7 +45,6 @@ import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.FutureMono; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; @@ -123,11 +122,12 @@ public NettyOutbound send(Publisher source) { log.debug(format(channel(), "Dropped HTTP content, " + "since response has Content-Length: 0 {}"), toPrettyHexDump(msg)); msg.release(); - return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER))); + return Mono.fromCompletionStage( + channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)).asStage()); } - return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(msg))); + return Mono.fromCompletionStage(channel().writeAndFlush(newFullBodyMessage(msg)).asStage()); } - return FutureMono.from(channel().writeAndFlush(msg)); + return Mono.fromCompletionStage(channel().writeAndFlush(msg).asStage()); }) .doOnDiscard(ByteBuf.class, ByteBuf::release), this, null); } @@ -144,7 +144,7 @@ public NettyOutbound sendObject(Object message) { return super.sendObject(message); } ByteBuf b = (ByteBuf) message; - return new PostHeadersNettyOutbound(FutureMono.deferFuture(() -> { + return new PostHeadersNettyOutbound(Mono.fromCompletionStage(() -> { if (markSentHeaderAndBody(b)) { try { afterMarkSentHeaders(); @@ -157,11 +157,11 @@ public NettyOutbound sendObject(Object message) { log.debug(format(channel(), "Dropped HTTP content, " + "since response has Content-Length: 0 {}"), toPrettyHexDump(b)); b.release(); - return channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)); + return channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)).asStage(); } - return channel().writeAndFlush(newFullBodyMessage(b)); + return channel().writeAndFlush(newFullBodyMessage(b)).asStage(); } - return channel().writeAndFlush(b); + return channel().writeAndFlush(b).asStage(); }), this, b); } @@ -175,7 +175,7 @@ public Mono then() { return Mono.empty(); } - return FutureMono.deferFuture(() -> { + return Mono.fromCompletionStage(() -> { if (markSentHeaders(outboundHttpMessage())) { HttpMessage msg; @@ -203,10 +203,11 @@ public Mono then() { } return channel().writeAndFlush(msg) - .addListener(f -> onHeadersSent()); + .addListener(f -> onHeadersSent()) + .asStage(); } else { - return channel().newSucceededFuture(); + return channel().newSucceededFuture().asStage(); } }); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java index fa24407876..579aacde8c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -66,7 +66,6 @@ import reactor.core.publisher.Sinks; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.FutureMono; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; @@ -396,7 +395,8 @@ public NettyOutbound send(Publisher source) { .flatMap(list -> { if (markSentHeaderAndBody(list.toArray())) { if (list.isEmpty()) { - return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER))); + return Mono.fromCompletionStage( + channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)).asStage()); } ByteBuf output; @@ -415,10 +415,10 @@ public NettyOutbound send(Publisher source) { } if (output.readableBytes() > 0) { - return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(output))); + return Mono.fromCompletionStage(channel().writeAndFlush(newFullBodyMessage(output)).asStage()); } output.release(); - return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER))); + return Mono.fromCompletionStage(channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)).asStage()); } for (ByteBuf bb : list) { if (log.isDebugEnabled()) { @@ -750,7 +750,7 @@ final Mono send() { } if (markSentHeaderAndBody()) { HttpMessage request = newFullBodyMessage(Unpooled.EMPTY_BUFFER); - return FutureMono.deferFuture(() -> channel().writeAndFlush(request)); + return Mono.fromCompletionStage(() -> channel().writeAndFlush(request).asStage()); } else { return Mono.empty(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java index 4e7e3bd6eb..e887e9dc3b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java @@ -38,7 +38,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.netty.FutureMono; import reactor.netty.NettyOutbound; import reactor.netty.ReactorNetty; import reactor.netty.http.websocket.WebsocketInbound; @@ -230,7 +229,7 @@ Mono sendClose(CloseWebSocketFrame frame) { if (CLOSE_SENT.get(this) == 0) { //commented for now as we assume the close is always scheduled (deferFuture runs) //onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame)); - return FutureMono.deferFuture(() -> { + return Mono.fromCompletionStage(() -> { if (CLOSE_SENT.getAndSet(this, 1) == 0) { discard(); // EmitResult is ignored as CLOSE_SENT guarantees that there will be only one emission @@ -239,10 +238,11 @@ Mono sendClose(CloseWebSocketFrame frame) { // FAIL_ZERO_SUBSCRIBER onCloseState.tryEmitValue(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); return channel().writeAndFlush(frame) - .addListener(channel(), ChannelFutureListeners.CLOSE); + .addListener(channel(), ChannelFutureListeners.CLOSE) + .asStage(); } frame.release(); - return channel().newSucceededFuture(); + return channel().newSucceededFuture().asStage(); }).doOnCancel(() -> ReactorNetty.safeRelease(frame)); } frame.release(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index 34ca83a6fd..5c94f2e682 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -72,7 +72,6 @@ import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.FutureMono; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; import reactor.netty.channel.AbortedException; @@ -368,11 +367,11 @@ public Flux receiveObject() { // No need to notify the upstream handlers - just log. // If decoding a response, just throw an error. if (HttpUtil.is100ContinueExpected(nettyRequest)) { - return FutureMono.deferFuture(() -> { + return Mono.fromCompletionStage(() -> { if (!hasSentHeaders()) { - return channel().writeAndFlush(CONTINUE); + return channel().writeAndFlush(CONTINUE).asStage(); } - return channel().newSucceededFuture(); + return channel().newSucceededFuture().asStage(); }) .thenMany(super.receiveObject()); @@ -431,7 +430,7 @@ public HttpHeaders responseHeaders() { public Mono send() { if (markSentHeaderAndBody()) { HttpMessage response = newFullBodyMessage(EMPTY_BUFFER); - return FutureMono.deferFuture(() -> channel().writeAndFlush(response)); + return Mono.fromCompletionStage(() -> channel().writeAndFlush(response).asStage()); } else { return Mono.empty(); @@ -799,13 +798,13 @@ final Mono withWebsocketSupport(String url, if (markSentHeaders()) { WebsocketServerOperations ops = new WebsocketServerOperations(url, websocketServerSpec, this); - return FutureMono.from(ops.handshakerResult) - .doOnEach(signal -> { - if (!signal.hasError() && (websocketServerSpec.protocols() == null || ops.selectedSubprotocol() != null)) { - websocketHandler.apply(ops, ops) - .subscribe(new WebsocketSubscriber(ops, Context.of(signal.getContextView()))); - } - }); + return Mono.fromCompletionStage(ops.handshakerResult.asStage()) + .doOnEach(signal -> { + if (!signal.hasError() && (websocketServerSpec.protocols() == null || ops.selectedSubprotocol() != null)) { + websocketHandler.apply(ops, ops) + .subscribe(new WebsocketSubscriber(ops, Context.of(signal.getContextView()))); + } + }); } else { log.error(format(channel(), "Cannot enable websocket if headers have already been sent")); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java index df17b417a6..3a38508a5c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java @@ -38,7 +38,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.netty.FutureMono; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; import reactor.netty.ReactorNetty; @@ -223,7 +222,7 @@ Mono sendClose(CloseWebSocketFrame frame) { if (CLOSE_SENT.get(this) == 0) { //commented for now as we assume the close is always scheduled (deferFuture runs) //onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame)); - return FutureMono.deferFuture(() -> { + return Mono.fromCompletionStage(() -> { if (CLOSE_SENT.getAndSet(this, 1) == 0) { discard(); // EmitResult is ignored as CLOSE_SENT guarantees that there will be only one emission @@ -232,10 +231,11 @@ Mono sendClose(CloseWebSocketFrame frame) { // FAIL_ZERO_SUBSCRIBER onCloseState.tryEmitValue(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); return channel().writeAndFlush(frame) - .addListener(channel(), ChannelFutureListeners.CLOSE); + .addListener(channel(), ChannelFutureListeners.CLOSE) + .asStage(); } frame.release(); - return channel().newSucceededFuture(); + return channel().newSucceededFuture().asStage(); }).doOnCancel(() -> ReactorNetty.safeRelease(frame)); } frame.release(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 41f1480bba..8c81ffc71d 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -101,7 +101,6 @@ import reactor.netty.ByteBufFlux; import reactor.netty.ByteBufMono; import reactor.netty.Connection; -import reactor.netty.FutureMono; import reactor.netty.NettyPipeline; import reactor.netty.SocketUtils; import reactor.netty.http.Http11SslContextSpec; @@ -1479,7 +1478,7 @@ void testChannelGroupClosesAllConnections() throws Exception { assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue(); - Mono.whenDelayError(FutureMono.from(group.close()), connectionProvider.disposeLater()) + Mono.whenDelayError(Mono.fromCompletionStage(group.close().asStage()), connectionProvider.disposeLater()) .block(Duration.ofSeconds(30)); assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java index 7c11f841e3..58e8ff682d 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -98,7 +98,6 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; -import reactor.netty.FutureMono; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; import reactor.netty.channel.AbortedException; @@ -747,7 +746,7 @@ void testConnectionCloseOnServerError() { .expectError(IOException.class) .verify(Duration.ofSeconds(30)); - FutureMono.from(ch.get().closeFuture()).block(Duration.ofSeconds(30)); + Mono.fromCompletionStage(ch.get().closeFuture().asStage()).block(Duration.ofSeconds(30)); } @Test