From 57070babfe6f3f25e90ddc87c4f2e55a1544b35e Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 3 Nov 2022 15:36:06 +0200 Subject: [PATCH] Use FutureMono instead of Mono#fromCompletionStage Reactor Netty does not need the type of cancellation provided by Mono#fromCompletionStage --- .../reactor/netty5/DisposableChannel.java | 2 +- .../main/java/reactor/netty5/FutureMono.java | 204 ++++++++++++++++ .../netty5/channel/ChannelOperations.java | 17 +- .../resources/DefaultLoopResources.java | 25 +- .../DefaultPooledConnectionProvider.java | 3 +- .../reactor/netty5/udp/UdpOperations.java | 25 +- .../java/reactor/netty5/FutureMonoTest.java | 222 ++++++++++++++++++ .../reactor/netty5/NettyOutboundTest.java | 4 +- .../reactor/netty5/http/HttpOperations.java | 23 +- .../reactor/netty5/http/client/Http2Pool.java | 3 +- .../http/client/HttpClientOperations.java | 16 +- .../client/WebsocketClientOperations.java | 8 +- .../http/server/HttpServerOperations.java | 27 ++- .../server/WebsocketServerOperations.java | 8 +- .../netty5/http/client/HttpClientTest.java | 3 +- .../netty5/http/server/HttpServerTests.java | 3 +- 16 files changed, 513 insertions(+), 80 deletions(-) create mode 100644 reactor-netty5-core/src/main/java/reactor/netty5/FutureMono.java create mode 100644 reactor-netty5-core/src/test/java/reactor/netty5/FutureMonoTest.java diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/DisposableChannel.java b/reactor-netty5-core/src/main/java/reactor/netty5/DisposableChannel.java index 971d6eae10..62579b14e9 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/DisposableChannel.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/DisposableChannel.java @@ -123,7 +123,7 @@ default boolean isDisposed() { * @return a {@link Mono} terminating with success if shutdown successfully or error */ default Mono onDispose() { - return Mono.fromCompletionStage(channel().closeFuture().asStage()); + return FutureMono.from(channel().closeFuture()); } /** diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/FutureMono.java b/reactor-netty5-core/src/main/java/reactor/netty5/FutureMono.java new file mode 100644 index 0000000000..23b4a1cb38 --- /dev/null +++ b/reactor-netty5-core/src/main/java/reactor/netty5/FutureMono.java @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty5; + +import java.nio.channels.ClosedChannelException; +import java.util.Objects; +import java.util.function.Supplier; + +import io.netty5.util.concurrent.Future; +import io.netty5.util.concurrent.FutureListener; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.netty5.channel.AbortedException; +import reactor.util.context.Context; + +/** + * Convert Netty Future into void {@link Mono}. + * + * @author Stephane Maldini + */ +public abstract class FutureMono extends Mono { + + /** + * Convert a {@link Future} into {@link Mono}. {@link Mono#subscribe(Subscriber)} + * will bridge to {@link Future#addListener(FutureListener)}. + * + * @param future the future to convert from + * @param the future type + * + * @return A {@link Mono} forwarding {@link Future} success or failure + */ + public static > Mono from(F future) { + Objects.requireNonNull(future, "future"); + if (future.isDone()) { + if (!future.isSuccess()) { + return Mono.error(FutureSubscription.wrapError(future.cause())); + } + return Mono.empty(); + } + return new ImmediateFutureMono<>(future); + } + + /** + * Convert a supplied {@link Future} for each subscriber into {@link Mono}. + * {@link Mono#subscribe(Subscriber)} + * will bridge to {@link Future#addListener(FutureListener)}. + * + * @param deferredFuture the future to evaluate and convert from + * @param the future type + * + * @return A {@link Mono} forwarding {@link Future} success or failure + */ + public static > Mono deferFuture(Supplier deferredFuture) { + return new DeferredFutureMono<>(deferredFuture); + } + + final static class ImmediateFutureMono> extends FutureMono { + + final F future; + + ImmediateFutureMono(F future) { + this.future = Objects.requireNonNull(future, "future"); + } + + @Override + public void subscribe(final CoreSubscriber s) { + doSubscribe(s, future); + } + } + + final static class DeferredFutureMono> extends FutureMono { + + final Supplier deferredFuture; + + DeferredFutureMono(Supplier deferredFuture) { + this.deferredFuture = + Objects.requireNonNull(deferredFuture, "deferredFuture"); + } + + @Override + public void subscribe(CoreSubscriber s) { + F f; + try { + f = deferredFuture.get(); + } + catch (Throwable t) { + Operators.error(s, t); + return; + } + + if (f == null) { + Operators.error(s, + Operators.onOperatorError(new NullPointerException( + "Deferred supplied null"), s.currentContext())); + return; + } + + doSubscribe(s, f); + } + } + + static > void doSubscribe(CoreSubscriber s, F future) { + if (future.isDone()) { + if (future.isSuccess()) { + Operators.complete(s); + } + else { + Operators.error(s, FutureSubscription.wrapError(future.cause())); + } + return; + } + + FutureSubscription fs = new FutureSubscription<>(future, s); + // propagate subscription before adding listener to avoid any race between finishing future and onSubscribe + // is called + s.onSubscribe(fs); + + // check if subscription was not cancelled immediately. + if (fs.cancelled) { + // if so do nothing anymore + return; + } + + // add listener to the future to propagate on complete when future is done + // addListener likely to be thread safe method + future.addListener(fs); + + // check once again if is cancelled to see if we need to removeListener in case addListener racing with + // subscription.cancel (which should remove listener) + if (fs.cancelled) { + // future.removeListener(fs); TODO + } + } + + final static class FutureSubscription> + implements FutureListener, Subscription, Supplier { + + final CoreSubscriber s; + + final F future; + + boolean cancelled; + + FutureSubscription(F future, CoreSubscriber s) { + this.s = s; + this.future = future; + } + + @Override + public void request(long n) { + //noop + } + + @Override + public Context get() { + return s.currentContext(); + } + + @Override + public void cancel() { + // cancel is not thread safe since we assume that removeListener is thread-safe. That said if we have + // concurrent addListener and removeListener and if addListener is after removeListener, the other Thread + // after execution addListener should see changes happened before removeListener. Thus, it should see + // cancelled flag set to true and should cleanup added handler + this.cancelled = true; + // future.removeListener(this); TODO + } + + @Override + public void operationComplete(Future future) { + if (!future.isSuccess()) { + s.onError(wrapError(future.cause())); + } + else { + s.onComplete(); + } + } + + private static Throwable wrapError(Throwable error) { + if (error instanceof ClosedChannelException) { + return new AbortedException(error); + } + else { + return error; + } + } + } +} \ No newline at end of file diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperations.java b/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperations.java index 688215d1e2..ca67f2f59b 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperations.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperations.java @@ -41,6 +41,7 @@ import reactor.netty5.ChannelOperationsId; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; +import reactor.netty5.FutureMono; import reactor.netty5.NettyInbound; import reactor.netty5.NettyOutbound; import reactor.netty5.NettyPipeline; @@ -287,8 +288,8 @@ public NettyOutbound send(Publisher dataStream, Predicate mono) { - return then(mono.flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage())) - .doOnDiscard(Buffer.class, Buffer::close)); + return then(mono.flatMap(m -> FutureMono.from(channel().writeAndFlush(m))) + .doOnDiscard(Buffer.class, Buffer::close)); } return then(MonoSendMany.bufferSource(dataStream, channel(), predicate)); } @@ -300,8 +301,8 @@ public NettyOutbound sendObject(Publisher dataStream, Predicate predi return then(Mono.error(AbortedException.beforeSend())); } if (dataStream instanceof Mono mono) { - return then(mono.flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage())) - .doOnDiscard(Resource.class, Resource::dispose)); + return then(mono.flatMap(m -> FutureMono.from(channel().writeAndFlush(m))) + .doOnDiscard(Resource.class, Resource::dispose)); } return then(MonoSendMany.objectSource(dataStream, channel(), predicate)); } @@ -312,8 +313,8 @@ public NettyOutbound sendObject(Object message) { ReactorNetty.safeRelease(message); return then(Mono.error(AbortedException.beforeSend())); } - return then(Mono.fromCompletionStage(() -> connection.channel() - .writeAndFlush(message).asStage()), + return then(FutureMono.deferFuture(() -> connection.channel() + .writeAndFlush(message)), () -> ReactorNetty.safeRelease(message)); } @@ -327,8 +328,8 @@ public NettyOutbound sendUsing(Callable sourceInput, return then(Mono.using( sourceInput, - s -> Mono.fromCompletionStage(connection.channel() - .writeAndFlush(mappedInput.apply(this, s)).asStage()), + s -> FutureMono.from(connection.channel() + .writeAndFlush(mappedInput.apply(this, s))), sourceCleanup) ); } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java index 80015dbc38..53b7b55eb2 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java @@ -28,6 +28,7 @@ import io.netty5.util.concurrent.FastThreadLocalThread; import reactor.core.publisher.Mono; import reactor.core.scheduler.NonBlocking; +import reactor.netty5.FutureMono; /** * An adapted global eventLoop handler. @@ -97,28 +98,28 @@ public Mono disposeLater(Duration quietPeriod, Duration timeout) { Mono cnsrvlMono = Mono.empty(); if (running.compareAndSet(true, false)) { if (clientLoopsGroup != null) { - clMono = Mono.fromCompletionStage(clientLoopsGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); + clMono = FutureMono.from(clientLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (serverSelectLoopsGroup != null) { - sslMono = Mono.fromCompletionStage(serverSelectLoopsGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); + sslMono = FutureMono.from(serverSelectLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (serverLoopsGroup != null) { - slMono = Mono.fromCompletionStage(serverLoopsGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); + slMono = FutureMono.from(serverLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (cacheNativeClientGroup != null) { - cnclMono = Mono.fromCompletionStage(cacheNativeClientGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); + cnclMono = FutureMono.from(cacheNativeClientGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (cacheNativeSelectGroup != null) { - cnslMono = Mono.fromCompletionStage(cacheNativeSelectGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); + cnslMono = FutureMono.from(cacheNativeSelectGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (cacheNativeServerGroup != null) { - cnsrvlMono = Mono.fromCompletionStage(cacheNativeServerGroup.shutdownGracefully( - quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage()); + cnsrvlMono = FutureMono.from(cacheNativeServerGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java index 9f15d31a5f..7e3aac7ff2 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java @@ -40,6 +40,7 @@ import reactor.core.publisher.Sinks; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; +import reactor.netty5.FutureMono; import reactor.netty5.NettyPipeline; import reactor.netty5.channel.ChannelOperations; import reactor.netty5.transport.TransportConfig; @@ -565,7 +566,7 @@ public void onSubscribe(Subscription s) { if (!pooledConnection.channel.isActive()) { return Mono.empty(); } - return Mono.fromCompletionStage(pooledConnection.channel.close().asStage()); + return FutureMono.from(pooledConnection.channel.close()); }; } } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/udp/UdpOperations.java b/reactor-netty5-core/src/main/java/reactor/netty5/udp/UdpOperations.java index d25ab486cb..cb7514dde2 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/udp/UdpOperations.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/udp/UdpOperations.java @@ -25,6 +25,7 @@ import reactor.core.publisher.Mono; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; +import reactor.netty5.FutureMono; import reactor.netty5.channel.ChannelOperations; import reactor.util.Logger; import reactor.util.Loggers; @@ -66,12 +67,12 @@ public Mono join(final InetAddress multicastAddress, @Nullable NetworkInte future = datagramChannel.joinGroup(multicastAddress); } - return Mono.fromCompletionStage(future.asStage()) - .doOnSuccess(v -> { - if (log.isInfoEnabled()) { - log.info(format(datagramChannel, "JOIN {}"), multicastAddress); - } - }); + return FutureMono.from(future) + .doOnSuccess(v -> { + if (log.isInfoEnabled()) { + log.info(format(datagramChannel, "JOIN {}"), multicastAddress); + } + }); } /** @@ -98,12 +99,12 @@ public Mono leave(final InetAddress multicastAddress, @Nullable NetworkInt future = datagramChannel.leaveGroup(multicastAddress); } - return Mono.fromCompletionStage(future.asStage()) - .doOnSuccess(v -> { - if (log.isInfoEnabled()) { - log.info(format(datagramChannel, "JOIN {}"), multicastAddress); - } - }); + return FutureMono.from(future) + .doOnSuccess(v -> { + if (log.isInfoEnabled()) { + log.info(format(datagramChannel, "JOIN {}"), multicastAddress); + } + }); } static final Logger log = Loggers.getLogger(UdpOperations.class); diff --git a/reactor-netty5-core/src/test/java/reactor/netty5/FutureMonoTest.java b/reactor-netty5-core/src/test/java/reactor/netty5/FutureMonoTest.java new file mode 100644 index 0000000000..73426852a7 --- /dev/null +++ b/reactor-netty5-core/src/test/java/reactor/netty5/FutureMonoTest.java @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty5; + +import io.netty5.util.concurrent.DefaultPromise; +import io.netty5.util.concurrent.Future; +import io.netty5.util.concurrent.FutureListener; +import io.netty5.util.concurrent.ImmediateEventExecutor; +import io.netty5.util.concurrent.Promise; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Operators; +import reactor.netty5.channel.AbortedException; +import reactor.test.StepVerifier; +import reactor.test.util.RaceTestUtils; +import reactor.util.annotation.Nullable; + +import java.lang.reflect.Field; +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; + +class FutureMonoTest { + + @Test + void testImmediateFutureMonoImmediate() { + ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + Future promise = eventExecutor.newFailedFuture(new ClosedChannelException()); + + StepVerifier.create(FutureMono.from(promise)) + .expectError(AbortedException.class) + .verify(Duration.ofSeconds(30)); + } + + // return value of setFailure not needed + @SuppressWarnings("FutureReturnValueIgnored") + @Test + void testImmediateFutureMonoLater() { + ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + Promise promise = eventExecutor.newPromise(); + + StepVerifier.create(FutureMono.from(promise.asFuture())) + .expectSubscription() + .then(() -> promise.setFailure(new ClosedChannelException())) + .expectError(AbortedException.class) + .verify(Duration.ofSeconds(30)); + } + + @Test + void testDeferredFutureMonoImmediate() { + ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + Supplier> promiseSupplier = () -> eventExecutor.newFailedFuture(new ClosedChannelException()); + + StepVerifier.create(FutureMono.deferFuture(promiseSupplier)) + .expectError(AbortedException.class) + .verify(Duration.ofSeconds(30)); + } + + // return value of setFailure not needed + @SuppressWarnings("FutureReturnValueIgnored") + @Test + void testDeferredFutureMonoLater() { + ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + Promise promise = eventExecutor.newPromise(); + Supplier> promiseSupplier = promise::asFuture; + + StepVerifier.create(FutureMono.deferFuture(promiseSupplier)) + .expectSubscription() + .then(() -> promise.setFailure(new ClosedChannelException())) + .expectError(AbortedException.class) + .verify(Duration.ofSeconds(30)); + } + + @Test + void raceTestImmediateFutureMonoWithSuccess() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber subscriber = new TestSubscriber(); + final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + final Promise promise = eventExecutor.newPromise(); + + RaceTestUtils.race(() -> FutureMono.from(promise.asFuture()) + .subscribe(subscriber), + subscriber::cancel, + () -> promise.setSuccess(null)); + + assertThat(resolveListeners(promise.asFuture())).isNullOrEmpty(); + assertThat(subscriber.operations).first() + .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); + } + } + + @Test + void raceTestDeferredFutureMonoWithSuccess() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber subscriber = new TestSubscriber(); + final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + final Promise promise = eventExecutor.newPromise(); + final Supplier> promiseSupplier = promise::asFuture; + + RaceTestUtils.race(() -> FutureMono.deferFuture(promiseSupplier) + .subscribe(subscriber), + subscriber::cancel, + () -> promise.setSuccess(null)); + + assertThat(resolveListeners(promise.asFuture())).isNullOrEmpty(); + assertThat(subscriber.operations).first() + .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); + } + } + + @Test + void raceTestImmediateFutureMono() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber subscriber = new TestSubscriber(); + final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + final Future future = eventExecutor.newSucceededFuture(); + + RaceTestUtils.race(() -> FutureMono.from(future) + .subscribe(subscriber), subscriber::cancel); + + assertThat(resolveListeners(future)).isNullOrEmpty(); + assertThat(subscriber.operations).first() + .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); + } + } + + @Test + void raceTestDeferredFutureMono() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber subscriber = new TestSubscriber(); + final ImmediateEventExecutor eventExecutor = ImmediateEventExecutor.INSTANCE; + final Future future = eventExecutor.newSucceededFuture(); + final Supplier> futureSupplier = () -> future; + + RaceTestUtils.race(() -> FutureMono.deferFuture(futureSupplier) + .subscribe(subscriber), subscriber::cancel); + + assertThat(resolveListeners(future)).isNullOrEmpty(); + assertThat(subscriber.operations).first() + .isEqualTo(TestSubscriber.Operation.ON_SUBSCRIBE); + } + } + + @Nullable + @SuppressWarnings("unchecked") + static FutureListener>[] resolveListeners(Future future) { + try { + final Field listeners = DefaultPromise.class.getDeclaredField("listeners"); + final Class aClass = Class.forName("io.netty5.util.concurrent.DefaultFutureListeners"); + final Field listeners1 = aClass.getDeclaredField("listeners"); + + listeners.setAccessible(true); + listeners1.setAccessible(true); + + final Object objListener = listeners.get(future); + if (objListener == null) { + return null; + } + return (FutureListener>[]) listeners1.get(objListener); + } + catch (NoSuchFieldException | ClassNotFoundException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + static class TestSubscriber implements CoreSubscriber { + + enum Operation { + ON_ERROR, ON_COMPLETE, ON_SUBSCRIBE + } + + volatile Subscription s; + static final AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(TestSubscriber.class, Subscription.class, "s"); + + final Queue operations = new ArrayBlockingQueue<>(3); + + @Override + public void onSubscribe(Subscription s) { + Operators.setOnce(S, this, s); + operations.add(Operation.ON_SUBSCRIBE); + } + + @Override + public void onNext(Void unused) { + + } + + @Override + public void onError(Throwable t) { + operations.add(Operation.ON_ERROR); + } + + @Override + public void onComplete() { + operations.add(Operation.ON_COMPLETE); + } + + void cancel() { + Operators.terminate(S, this); + } + } +} \ No newline at end of file diff --git a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java index 3ebef8d9e0..107e2bc1a0 100644 --- a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java +++ b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java @@ -333,8 +333,8 @@ static Mono mockSendUsing(Connection c, Callable sourceIn Consumer sourceCleanup) { return Mono.using( sourceInput, - s -> Mono.fromCompletionStage(c.channel() - .writeAndFlush(mappedInput.apply(c, s)).asStage()), + s -> FutureMono.from(c.channel() + .writeAndFlush(mappedInput.apply(c, s))), sourceCleanup ); } diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/HttpOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/HttpOperations.java index 384d87cf1b..cc826c6f9d 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/HttpOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/HttpOperations.java @@ -49,6 +49,7 @@ import reactor.netty5.BufferFlux; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; +import reactor.netty5.FutureMono; import reactor.netty5.NettyInbound; import reactor.netty5.NettyOutbound; import reactor.netty5.NettyPipeline; @@ -139,12 +140,11 @@ public NettyOutbound send(Publisher source) { "since response has Content-Length: 0 {}"), b); } b.close(); - return Mono.fromCompletionStage( - channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))).asStage()); + return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0)))); } - return Mono.fromCompletionStage(channel().writeAndFlush(newFullBodyMessage(b)).asStage()); + return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(b))); } - return Mono.fromCompletionStage(channel().writeAndFlush(b).asStage()); + return FutureMono.from(channel().writeAndFlush(b)); }) .doOnDiscard(Buffer.class, Buffer::close), this, null); } @@ -160,7 +160,7 @@ public NettyOutbound sendObject(Object message) { if (!(message instanceof Buffer b)) { return super.sendObject(message); } - return new PostHeadersNettyOutbound(Mono.fromCompletionStage(() -> { + return new PostHeadersNettyOutbound(FutureMono.deferFuture(() -> { if (markSentHeaderAndBody(b)) { try { afterMarkSentHeaders(); @@ -175,11 +175,11 @@ public NettyOutbound sendObject(Object message) { "since response has Content-Length: 0 {}"), b); } b.close(); - return channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))).asStage(); + return channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))); } - return channel().writeAndFlush(newFullBodyMessage(b)).asStage(); + return channel().writeAndFlush(newFullBodyMessage(b)); } - return channel().writeAndFlush(b).asStage(); + return channel().writeAndFlush(b); }), this, b); } @@ -193,7 +193,7 @@ public Mono then() { return Mono.empty(); } - return Mono.fromCompletionStage(() -> { + return FutureMono.deferFuture(() -> { if (markSentHeaders(outboundHttpMessage())) { HttpMessage msg; @@ -221,11 +221,10 @@ public Mono then() { } return channel().writeAndFlush(msg) - .addListener(f -> onHeadersSent()) - .asStage(); + .addListener(f -> onHeadersSent()); } else { - return channel().newSucceededFuture().asStage(); + return channel().newSucceededFuture(); } }); } diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/Http2Pool.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/Http2Pool.java index 1bddee85ee..fcd01de55a 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/Http2Pool.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/Http2Pool.java @@ -42,6 +42,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.netty5.Connection; +import reactor.netty5.FutureMono; import reactor.netty5.NettyPipeline; import reactor.netty5.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty5.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; @@ -744,7 +745,7 @@ void scheduleEviction() { if (!connection.channel().isActive()) { return Mono.empty(); } - return Mono.fromCompletionStage(connection.channel().close().asStage()); + return FutureMono.from(connection.channel().close()); }; static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable { diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java index e74b55aa9f..0143c86482 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java @@ -76,6 +76,7 @@ import reactor.core.publisher.Sinks; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; +import reactor.netty5.FutureMono; import reactor.netty5.NettyInbound; import reactor.netty5.NettyOutbound; import reactor.netty5.NettyPipeline; @@ -396,8 +397,7 @@ public NettyOutbound send(Publisher source) { .flatMap(list -> { if (markSentHeaderAndBody(list.toArray())) { if (list.isEmpty()) { - return Mono.fromCompletionStage( - channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))).asStage()); + return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0)))); } Buffer output; @@ -415,10 +415,10 @@ public NettyOutbound send(Publisher source) { } if (output.readableBytes() > 0) { - return Mono.fromCompletionStage(channel().writeAndFlush(newFullBodyMessage(output)).asStage()); + return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(output))); } output.close(); - return Mono.fromCompletionStage(channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))).asStage()); + return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0)))); } for (Buffer bb : list) { if (log.isDebugEnabled()) { @@ -741,9 +741,9 @@ final Mono send() { if (!channel().isActive()) { return Mono.error(AbortedException.beforeSend()); } - return Mono.fromCompletionStage(() -> markSentHeaderAndBody() ? - channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))).asStage() : - channel().newSucceededFuture().asStage()); + return FutureMono.deferFuture(() -> markSentHeaderAndBody() ? + channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))) : + channel().newSucceededFuture()); } final void setNettyResponse(HttpResponse nettyResponse) { @@ -879,7 +879,7 @@ void _subscribe(CoreSubscriber s) { .writeAndFlush(encoder); } else { - Mono mono = Mono.fromCompletionStage(f.asStage()); + Mono mono = FutureMono.from(f); if (encoder.cleanOnTerminate) { mono = mono.doOnCancel(encoder) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/WebsocketClientOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/WebsocketClientOperations.java index 07127801f9..ddd8dc6480 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/WebsocketClientOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/WebsocketClientOperations.java @@ -38,6 +38,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.netty5.FutureMono; import reactor.netty5.NettyOutbound; import reactor.netty5.ReactorNetty; import reactor.netty5.http.websocket.WebsocketInbound; @@ -222,7 +223,7 @@ Mono sendClose(CloseWebSocketFrame frame) { if (CLOSE_SENT.get(this) == 0) { //commented for now as we assume the close is always scheduled (deferFuture runs) //onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame)); - return Mono.fromCompletionStage(() -> { + return FutureMono.deferFuture(() -> { if (CLOSE_SENT.getAndSet(this, 1) == 0) { discard(); // EmitResult is ignored as CLOSE_SENT guarantees that there will be only one emission @@ -231,11 +232,10 @@ Mono sendClose(CloseWebSocketFrame frame) { // FAIL_ZERO_SUBSCRIBER onCloseState.tryEmitValue(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); return channel().writeAndFlush(frame) - .addListener(channel(), ChannelFutureListeners.CLOSE) - .asStage(); + .addListener(channel(), ChannelFutureListeners.CLOSE); } frame.close(); - return channel().newSucceededFuture().asStage(); + return channel().newSucceededFuture(); }).doOnCancel(() -> ReactorNetty.safeRelease(frame)); } frame.close(); diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java index 0babdfd9b4..9c612c0aa7 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java @@ -76,6 +76,7 @@ import reactor.core.publisher.Mono; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; +import reactor.netty5.FutureMono; import reactor.netty5.NettyOutbound; import reactor.netty5.NettyPipeline; import reactor.netty5.channel.AbortedException; @@ -378,11 +379,11 @@ public Flux receiveObject() { // No need to notify the upstream handlers - just log. // If decoding a response, just throw an error. if (HttpUtil.is100ContinueExpected(nettyRequest)) { - return Mono.fromCompletionStage(() -> { + return FutureMono.deferFuture(() -> { if (!hasSentHeaders()) { - return channel().writeAndFlush(CONTINUE).asStage(); + return channel().writeAndFlush(CONTINUE); } - return channel().newSucceededFuture().asStage(); + return channel().newSucceededFuture(); }) .thenMany(super.receiveObject()); @@ -439,9 +440,9 @@ public HttpHeaders responseHeaders() { @Override public Mono send() { - return Mono.fromCompletionStage(() -> markSentHeaderAndBody() ? - channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))).asStage() : - channel().newSucceededFuture().asStage()); + return FutureMono.deferFuture(() -> markSentHeaderAndBody() ? + channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))) : + channel().newSucceededFuture()); } @Override @@ -850,13 +851,13 @@ final Mono withWebsocketSupport(String url, if (markSentHeaders()) { WebsocketServerOperations ops = new WebsocketServerOperations(url, websocketServerSpec, this); - return Mono.fromCompletionStage(ops.handshakerResult.asStage()) - .doOnEach(signal -> { - if (!signal.hasError() && (websocketServerSpec.protocols() == null || ops.selectedSubprotocol() != null)) { - websocketHandler.apply(ops, ops) - .subscribe(new WebsocketSubscriber(ops, Context.of(signal.getContextView()))); - } - }); + return FutureMono.from(ops.handshakerResult) + .doOnEach(signal -> { + if (!signal.hasError() && (websocketServerSpec.protocols() == null || ops.selectedSubprotocol() != null)) { + websocketHandler.apply(ops, ops) + .subscribe(new WebsocketSubscriber(ops, Context.of(signal.getContextView()))); + } + }); } else { log.error(format(channel(), "Cannot enable websocket if headers have already been sent")); diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java index 2b80110f45..e0efcd706e 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java @@ -39,6 +39,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.netty5.FutureMono; import reactor.netty5.NettyOutbound; import reactor.netty5.NettyPipeline; import reactor.netty5.ReactorNetty; @@ -220,7 +221,7 @@ Mono sendClose(CloseWebSocketFrame frame) { if (CLOSE_SENT.get(this) == 0) { //commented for now as we assume the close is always scheduled (deferFuture runs) //onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame)); - return Mono.fromCompletionStage(() -> { + return FutureMono.deferFuture(() -> { if (CLOSE_SENT.getAndSet(this, 1) == 0) { discard(); // EmitResult is ignored as CLOSE_SENT guarantees that there will be only one emission @@ -229,11 +230,10 @@ Mono sendClose(CloseWebSocketFrame frame) { // FAIL_ZERO_SUBSCRIBER onCloseState.tryEmitValue(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); return channel().writeAndFlush(frame) - .addListener(channel(), ChannelFutureListeners.CLOSE) - .asStage(); + .addListener(channel(), ChannelFutureListeners.CLOSE); } frame.close(); - return channel().newSucceededFuture().asStage(); + return channel().newSucceededFuture(); }).doOnCancel(() -> ReactorNetty.safeRelease(frame)); } frame.close(); diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientTest.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientTest.java index dde5d59042..e811e84291 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientTest.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientTest.java @@ -102,6 +102,7 @@ import reactor.netty5.BufferFlux; import reactor.netty5.BufferMono; import reactor.netty5.Connection; +import reactor.netty5.FutureMono; import reactor.netty5.NettyPipeline; import reactor.netty5.SocketUtils; import reactor.netty5.http.Http11SslContextSpec; @@ -1488,7 +1489,7 @@ void testChannelGroupClosesAllConnections() throws Exception { assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue(); - Mono.whenDelayError(Mono.fromCompletionStage(group.close().asStage()), connectionProvider.disposeLater()) + Mono.whenDelayError(FutureMono.from(group.close()), connectionProvider.disposeLater()) .block(Duration.ofSeconds(30)); assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue(); diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java index 25250580cd..85f8063a2a 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java @@ -105,6 +105,7 @@ import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; import reactor.netty5.DisposableServer; +import reactor.netty5.FutureMono; import reactor.netty5.NettyOutbound; import reactor.netty5.NettyPipeline; import reactor.netty5.channel.AbortedException; @@ -827,7 +828,7 @@ void testConnectionCloseOnServerError() { .expectError(IOException.class) .verify(Duration.ofSeconds(30)); - Mono.fromCompletionStage(ch.get().closeFuture().asStage()).block(Duration.ofSeconds(30)); + FutureMono.from(ch.get().closeFuture()).block(Duration.ofSeconds(30)); } @Test