From 5d604aa8f52acc52c8b1eba9e05115c8260f379e Mon Sep 17 00:00:00 2001 From: Ryland Degnan Date: Sat, 23 Dec 2017 12:25:41 -0800 Subject: [PATCH] Replace close() method with dispose()/isDisposed() --- .../src/jmh/java/io/rsocket/RSocketPerf.java | 14 +-- .../perfutil/TestDuplexConnection.java | 3 +- .../main/java/io/rsocket/AbstractRSocket.java | 16 ++-- .../src/main/java/io/rsocket/Closeable.java | 16 +--- .../java/io/rsocket/DuplexConnection.java | 5 ++ .../src/main/java/io/rsocket/RSocket.java | 2 +- .../main/java/io/rsocket/RSocketClient.java | 21 +++-- .../main/java/io/rsocket/RSocketFactory.java | 2 +- .../main/java/io/rsocket/RSocketServer.java | 11 ++- .../FragmentationDuplexConnection.java | 29 +++--- .../ClientServerInputMultiplexer.java | 27 ++++-- .../io/rsocket/util/CloseableAdapter.java | 29 ------ .../java/io/rsocket/util/RSocketProxy.java | 9 +- .../test/util/LocalDuplexConnection.java | 18 ++-- .../io/rsocket/test/util/MockRSocket.java | 9 +- .../test/util/TestDuplexConnection.java | 15 ++-- .../tcp/channel/ChannelEchoClient.java | 3 +- .../tcp/requestresponse/HelloWorldClient.java | 2 +- .../transport/tcp/stream/StreamingClient.java | 4 +- .../rsocket/integration/IntegrationTest.java | 4 +- .../integration/TcpIntegrationTest.java | 2 +- .../rsocket/integration/TestingStreaming.java | 6 +- .../client/LoadBalancedRSocketMono.java | 88 +++++++------------ .../client/filter/BackupRequestSocket.java | 9 +- .../client/filter/RSocketSupplier.java | 11 ++- .../io/rsocket/client/filter/RSockets.java | 30 +++---- .../io/rsocket/client/TestingRSocket.java | 17 ++-- .../spectator/SpectatorFrameInterceptor.java | 9 +- .../rsocket/spectator/SpectatorRSocket.java | 9 +- .../java/io/rsocket/test/ClientSetupRule.java | 2 +- .../rsocket/aeron/AeronDuplexConnection.java | 32 +++---- .../reactivestreams/AeronChannel.java | 22 ++--- .../reactivestreams/AeronChannelServer.java | 24 ++--- .../ReactiveStreamsRemote.java | 2 - .../local/LocalDuplexConnection.java | 24 +++-- .../transport/local/LocalServerTransport.java | 25 +++--- .../netty/NettyDuplexConnection.java | 12 +-- .../netty/WebsocketDuplexConnection.java | 12 +-- .../netty/server/NettyContextCloseable.java | 21 ++--- .../netty/server/WebsocketRouteTransport.java | 28 +++--- .../server/WebsocketServerTransport.java | 2 +- 41 files changed, 307 insertions(+), 319 deletions(-) delete mode 100644 rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java diff --git a/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java b/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java index 2391628c5..b8395f255 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java +++ b/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java @@ -79,7 +79,7 @@ public static class Input { static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8)); - static final Payload HELLO_PAYLOAD = new DefaultPayload(HELLO); + static final Payload HELLO_PAYLOAD = DefaultPayload.create(HELLO); static final DirectProcessor clientReceive = DirectProcessor.create(); static final DirectProcessor serverReceive = DirectProcessor.create(); @@ -121,8 +121,7 @@ public Mono metadataPush(Payload payload) { } @Override - public Mono close() { - return Mono.empty(); + public void dispose() { } @Override @@ -140,8 +139,13 @@ public Mono onClose() { MonoProcessor onClose = MonoProcessor.create(); @Override - public Mono close() { - return Mono.empty().doFinally(s -> onClose.onComplete()).then(); + public void dispose() { + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override diff --git a/rsocket-core/src/jmh/java/io/rsocket/perfutil/TestDuplexConnection.java b/rsocket-core/src/jmh/java/io/rsocket/perfutil/TestDuplexConnection.java index 47db25d45..5bd1cbd23 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/perfutil/TestDuplexConnection.java +++ b/rsocket-core/src/jmh/java/io/rsocket/perfutil/TestDuplexConnection.java @@ -68,8 +68,7 @@ public double availability() { } @Override - public Mono close() { - return Mono.empty(); + public void dispose() { } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java b/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java index 93c8cd57c..674eb0d0f 100644 --- a/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java @@ -24,9 +24,6 @@ /** * An abstract implementation of {@link RSocket}. All request handling methods emit {@link * UnsupportedOperationException} and hence must be overridden to provide a valid implementation. - * - *

{@link #close()} returns a {@code Publisher} that immediately terminates. That same Publisher - * is returned by the {@link #onClose()} method. */ public abstract class AbstractRSocket implements RSocket { @@ -58,12 +55,13 @@ public Mono metadataPush(Payload payload) { } @Override - public Mono close() { - return Mono.defer( - () -> { - onClose.onComplete(); - return onClose; - }); + public void dispose() { + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/Closeable.java b/rsocket-core/src/main/java/io/rsocket/Closeable.java index ee2f177ff..9eab77e30 100644 --- a/rsocket-core/src/main/java/io/rsocket/Closeable.java +++ b/rsocket-core/src/main/java/io/rsocket/Closeable.java @@ -16,24 +16,14 @@ package io.rsocket; +import reactor.core.Disposable; import reactor.core.publisher.Mono; /** */ -public interface Closeable { - /** - * Close this {@code RSocket} upon subscribing to the returned {@code Publisher} - * - *

This method is idempotent and hence can be called as many times at any point with same - * outcome. - * - * @return A {@code Publisher} that triggers the close when subscribed to and that completes when - * this {@code RSocket} close is complete. - */ - Mono close(); - +public interface Closeable extends Disposable { /** * Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code - * RSocket} can be closed by explicitly calling {@link #close()} or when the underlying transport + * RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying transport * connection is closed. * * @return A {@code Publisher} that completes when this {@code RSocket} close is complete. diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index 0dea9d9d2..3113d20f5 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -74,4 +74,9 @@ default Mono sendOne(Frame frame) { * @return Stream of all {@code Frame}s received. */ Flux receive(); + + @Override + default double availability() { + return isDisposed() ? 0.0 : 1.0; + } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index 0f006d5ca..725aa9c92 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -71,6 +71,6 @@ public interface RSocket extends Availability, Closeable { @Override default double availability() { - return 0.0; + return isDisposed() ? 0.0 : 1.0; } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 4713c52d1..07b8879fd 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -98,7 +98,7 @@ class RSocketClient implements RSocket { .doOnError( t -> { errorConsumer.accept(t); - connection.close().subscribe(); + connection.dispose(); }) .subscribe(); } @@ -234,8 +234,13 @@ public double availability() { } @Override - public Mono close() { - return connection.close(); + public void dispose() { + connection.dispose(); + } + + @Override + public boolean isDisposed() { + return connection.isDisposed(); } @Override @@ -260,25 +265,25 @@ public Flux handleRequestStream(final Payload payload) { return receiver .doOnRequest( l -> { - if (first.compareAndSet(false, true) && !receiver.isTerminated()) { + if (first.compareAndSet(false, true) && !receiver.isDisposed()) { final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, l); payload.release(); sendProcessor.onNext(requestFrame); - } else if (contains(streamId) && !receiver.isTerminated()) { + } else if (contains(streamId) && !receiver.isDisposed()) { sendProcessor.onNext(Frame.RequestN.from(streamId, l)); } sendProcessor.drain(); }) .doOnError( t -> { - if (contains(streamId) && !receiver.isTerminated()) { + if (contains(streamId) && !receiver.isDisposed()) { sendProcessor.onNext(Frame.Error.from(streamId, t)); } }) .doOnCancel( () -> { - if (contains(streamId) && !receiver.isTerminated()) { + if (contains(streamId) && !receiver.isDisposed()) { sendProcessor.onNext(Frame.Cancel.from(streamId)); } }) @@ -326,7 +331,7 @@ private Flux handleChannel(Flux request, FrameType requestType boolean firstRequest = true; boolean isValidToSendFrame() { - return contains(streamId) && !receiver.isTerminated(); + return contains(streamId) && !receiver.isDisposed(); } void sendOneFrame(Frame frame) { diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 33f56c349..f69630757 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -342,7 +342,7 @@ private Mono processSetupFrame( return multiplexer .asStreamZeroConnection() .sendOne(Frame.Error.from(0, error)) - .then(multiplexer.close()); + .doFinally(signalType -> multiplexer.dispose()); } ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 7b959e756..e60afa0b1 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -200,8 +200,13 @@ public Mono metadataPush(Payload payload) { } @Override - public Mono close() { - return connection.close(); + public void dispose() { + connection.dispose(); + } + + @Override + public boolean isDisposed() { + return connection.isDisposed(); } @Override @@ -213,7 +218,7 @@ private void cleanup() { cleanUpSendingSubscriptions(); cleanUpChannelProcessors(); - requestHandler.close().subscribe(); + requestHandler.dispose(); } private synchronized void cleanUpSendingSubscriptions() { diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 38c46b62c..30d9a3d8b 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -85,20 +85,13 @@ public Flux receive() { } @Override - public Mono close() { - return source.close(); + public void dispose() { + source.dispose(); } - private synchronized FrameReassembler getFrameReassembler(Frame frame) { - return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame)); - } - - private synchronized FrameReassembler removeFrameReassembler(int streamId) { - return frameReassemblers.remove(streamId); - } - - private synchronized boolean frameReassemblersContain(int streamId) { - return frameReassemblers.containsKey(streamId); + @Override + public boolean isDisposed() { + return source.isDisposed(); } @Override @@ -114,4 +107,16 @@ public Mono onClose() { } }); } + + private synchronized FrameReassembler getFrameReassembler(Frame frame) { + return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame)); + } + + private synchronized FrameReassembler removeFrameReassembler(int streamId) { + return frameReassemblers.remove(streamId); + } + + private synchronized boolean frameReassemblersContain(int streamId) { + return frameReassemblers.containsKey(streamId); + } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index 861a8246e..0f9209462 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -16,6 +16,7 @@ package io.rsocket.internal; +import io.rsocket.Closeable; import io.rsocket.DuplexConnection; import io.rsocket.Frame; import io.rsocket.FrameType; @@ -41,7 +42,7 @@ * even. Even IDs are for the streams initiated by server and odds are for streams initiated by the * client. */ -public class ClientServerInputMultiplexer { +public class ClientServerInputMultiplexer implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger"); private final DuplexConnection streamZeroConnection; @@ -112,8 +113,19 @@ public DuplexConnection asStreamZeroConnection() { return streamZeroConnection; } - public Mono close() { - return source.close(); + @Override + public void dispose() { + source.dispose(); + } + + @Override + public boolean isDisposed() { + return source.isDisposed(); + } + + @Override + public Mono onClose() { + return source.onClose(); } private static class InternalDuplexConnection implements DuplexConnection { @@ -158,8 +170,13 @@ public Flux receive() { } @Override - public Mono close() { - return source.close(); + public void dispose() { + source.dispose(); + } + + @Override + public boolean isDisposed() { + return source.isDisposed(); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java b/rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java deleted file mode 100644 index a4efce8b5..000000000 --- a/rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.rsocket.util; - -import io.rsocket.Closeable; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; - -public class CloseableAdapter implements Closeable { - private final MonoProcessor onClose = MonoProcessor.create(); - private Runnable closeFunction; - - public CloseableAdapter(Runnable closeFunction) { - this.closeFunction = closeFunction; - } - - @Override - public Mono close() { - return Mono.defer( - () -> { - closeFunction.run(); - onClose.onComplete(); - return onClose; - }); - } - - @Override - public Mono onClose() { - return onClose; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java index 29aefa742..ef757fbc6 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java +++ b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java @@ -60,8 +60,13 @@ public double availability() { } @Override - public Mono close() { - return source.close(); + public void dispose() { + source.dispose(); + } + + @Override + public boolean isDisposed() { + return source.isDisposed(); } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java index 3e8a0e551..46b4ee268 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java @@ -27,7 +27,7 @@ public class LocalDuplexConnection implements DuplexConnection { private final DirectProcessor send; private final DirectProcessor receive; - private final MonoProcessor closeNotifier; + private final MonoProcessor onClose; private final String name; public LocalDuplexConnection( @@ -35,7 +35,7 @@ public LocalDuplexConnection( this.name = name; this.send = send; this.receive = receive; - closeNotifier = MonoProcessor.create(); + onClose = MonoProcessor.create(); } @Override @@ -53,21 +53,17 @@ public Flux receive() { } @Override - public double availability() { - return 1; + public void dispose() { + onClose.onComplete(); } @Override - public Mono close() { - return Mono.defer( - () -> { - closeNotifier.onComplete(); - return Mono.empty(); - }); + public boolean isDisposed() { + return onClose.isDisposed(); } @Override public Mono onClose() { - return closeNotifier; + return onClose; } } diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java index e0ee1713c..b62d61b09 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java @@ -77,8 +77,13 @@ public double availability() { } @Override - public Mono close() { - return delegate.close(); + public void dispose() { + delegate.dispose(); + } + + @Override + public boolean isDisposed() { + return delegate.isDisposed(); } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 357ffda69..55b29a6d9 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -41,7 +41,7 @@ public class TestDuplexConnection implements DuplexConnection { private final LinkedBlockingQueue sent; private final DirectProcessor sentPublisher; private final DirectProcessor received; - private final MonoProcessor close; + private final MonoProcessor onClose; private final ConcurrentLinkedQueue> sendSubscribers; private volatile double availability = 1; private volatile int initialSendRequestN = Integer.MAX_VALUE; @@ -51,7 +51,7 @@ public TestDuplexConnection() { received = DirectProcessor.create(); sentPublisher = DirectProcessor.create(); sendSubscribers = new ConcurrentLinkedQueue<>(); - close = MonoProcessor.create(); + onClose = MonoProcessor.create(); } @Override @@ -84,13 +84,18 @@ public double availability() { } @Override - public Mono close() { - return close; + public void dispose() { + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override public Mono onClose() { - return close(); + return onClose; } public Frame awaitSend() throws InterruptedException { diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java index 1b6077c87..049b3c3a5 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java @@ -48,7 +48,8 @@ public static void main(String[] args) { .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) - .thenEmpty(socket.close()) + .doFinally(signalType -> socket.dispose()) + .then() .block(); } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java index 3282a72e5..7941c05df 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java @@ -76,6 +76,6 @@ public Mono requestResponse(Payload p) { .doOnNext(System.out::println) .block(); - socket.close().block(); + socket.dispose(); } } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java index b33ae5626..6090e61c2 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java @@ -41,7 +41,9 @@ public static void main(String[] args) { .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) - .thenEmpty(socket.close()) + .then() + .doFinally(signalType -> socket.dispose()) + .then() .block(); } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java index d6d0741a6..770018593 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java @@ -147,7 +147,7 @@ public Flux requestChannel(Publisher payloads) { @After public void teardown() { - server.close().block(); + server.dispose(); } @Test(timeout = 5_000L) @@ -170,7 +170,7 @@ public void testStream() { @Test(timeout = 5_000L) public void testClose() throws InterruptedException { - client.close().block(); + client.dispose(); disconnectionCounter.await(); } diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java index 3c3dacd37..44cad1d1a 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java @@ -64,7 +64,7 @@ private RSocket buildClient() { @After public void cleanup() { - server.close().block(); + server.dispose(); } @Test(timeout = 5_000L) diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java index aa95272d1..8d30a406a 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java @@ -60,7 +60,7 @@ public Flux requestStream(Payload payload) { System.out.println("here"); } finally { - server.close().block(); + server.dispose(); } } @@ -98,7 +98,7 @@ public Flux requestStream(Payload payload) { System.out.println("here"); } finally { - server.close().block(); + server.dispose(); } } @@ -149,7 +149,7 @@ public Flux requestStream(Payload payload) { consumer("1").blockLast(); } finally { - server.close().block(); + server.dispose(); } } diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index 76dd15ad2..4f5649f24 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -90,7 +90,6 @@ public abstract class LoadBalancedRSocketMono extends Mono private final ArrayList activeSockets; private final ArrayList activeFactories; private final FactoriesRefresher factoryRefresher; - private final Mono selectSocket; private final Ewma pendings; private volatile int targetAperture; @@ -137,7 +136,6 @@ private LoadBalancedRSocketMono( this.activeFactories = new ArrayList<>(); this.pendingSockets = 0; this.factoryRefresher = new FactoriesRefresher(); - this.selectSocket = Mono.fromCallable(this::select); this.minPendings = minPendings; this.maxPendings = maxPendings; @@ -154,12 +152,7 @@ private LoadBalancedRSocketMono( factories.subscribe(factoryRefresher); - rSocketMono = - Mono.create( - sink -> { - RSocket rSocket = select(); - sink.success(rSocket); - }); + rSocketMono = Mono.fromCallable(this::select); } public static LoadBalancedRSocketMono create( @@ -198,7 +191,7 @@ public static LoadBalancedRSocketMono create( maxRefreshPeriodMs) { @Override public void subscribe(CoreSubscriber s) { - started.thenMany(rSocketMono).subscribe(s); + started.then(rSocketMono).subscribe(s); } }; } @@ -373,7 +366,7 @@ private synchronized void removeSocket(WeightedSocket socket, boolean refresh) { logger.debug("Removing socket: -> " + socket); activeSockets.remove(socket); activeFactories.add(socket.getFactory()); - socket.close().subscribe(); + socket.dispose(); if (refresh) { refreshSockets(); } @@ -481,50 +474,24 @@ public synchronized String toString() { } @Override - public Mono onClose() { - return onClose; + public void dispose() { + synchronized (this) { + factoryRefresher.close(); + activeFactories.clear(); + activeSockets.forEach(WeightedSocket::dispose); + activeSockets.clear(); + onClose.onComplete(); + } } @Override - public Mono close() { - return Mono.from( - subscriber -> { - subscriber.onSubscribe(Operators.emptySubscription()); - - synchronized (this) { - factoryRefresher.close(); - activeFactories.clear(); - AtomicInteger n = new AtomicInteger(activeSockets.size()); - - activeSockets.forEach( - rs -> - rs.close() - .subscribe( - new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Void aVoid) {} - - @Override - public void onError(Throwable t) { - logger.warn("Exception while closing a RSocket", t); - onComplete(); - } - - @Override - public void onComplete() { - if (n.decrementAndGet() == 0) { - subscriber.onComplete(); - onClose.onComplete(); - } - } - })); - } - }); + public boolean isDisposed() { + return onClose.isDisposed(); + } + + @Override + public Mono onClose() { + return onClose; } /** @@ -564,7 +531,7 @@ public void onNext(Collection newFactories) { it0.remove(); try { changed = true; - socket.close(); + socket.dispose(); } catch (Exception e) { logger.warn("Exception while closing a RSocket", e); } @@ -712,8 +679,12 @@ public double availability() { } @Override - public Mono close() { - return Mono.empty(); + public void dispose() { + } + + @Override + public boolean isDisposed() { + return true; } @Override @@ -880,8 +851,13 @@ private synchronized void observe(double rtt) { } @Override - public Mono close() { - return source.close(); + public void dispose() { + source.dispose(); + } + + @Override + public boolean isDisposed() { + return source.isDisposed(); } @Override diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java index 745ded482..ebe4ef7c8 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java @@ -88,8 +88,13 @@ public double availability() { } @Override - public Mono close() { - return child.close(); + public void dispose() { + child.dispose(); + } + + @Override + public boolean isDisposed() { + return child.isDisposed(); } @Override diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java index 29617188b..5fd66dc83 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java @@ -87,8 +87,13 @@ public Mono get() { } @Override - public Mono close() { - return Mono.empty().doFinally(s -> onClose.onComplete()).then(); + public void dispose() { + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override @@ -100,7 +105,7 @@ private class AvailabilityAwareRSocketProxy extends RSocketProxy { public AvailabilityAwareRSocketProxy(RSocket source) { super(source); - onClose.then(close()).subscribe(); + onClose.doFinally(signalType -> source.dispose()).subscribe(); } @Override diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java index 76041bca1..7da494d26 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java @@ -72,7 +72,7 @@ public Mono metadataPush(Payload payload) { /** * Provides a mapping function to wrap a {@code RSocket} such that a call to {@link - * RSocket#close()} does not cancel all pending requests. Instead, it will wait for all pending + * RSocket#dispose()} does not cancel all pending requests. Instead, it will wait for all pending * requests to finish and then close the socket. * * @return Function to transform any socket into a safe closing socket. @@ -91,7 +91,7 @@ public Mono fireAndForget(Payload payload) { .doFinally( signalType -> { if (count.decrementAndGet() == 0 && closed.get()) { - source.close().subscribe(); + source.dispose(); } }); } @@ -104,7 +104,7 @@ public Mono requestResponse(Payload payload) { .doFinally( signalType -> { if (count.decrementAndGet() == 0 && closed.get()) { - source.close().subscribe(); + source.dispose(); } }); } @@ -117,7 +117,7 @@ public Flux requestStream(Payload payload) { .doFinally( signalType -> { if (count.decrementAndGet() == 0 && closed.get()) { - source.close().subscribe(); + source.dispose(); } }); } @@ -130,7 +130,7 @@ public Flux requestChannel(Publisher payloads) { .doFinally( signalType -> { if (count.decrementAndGet() == 0 && closed.get()) { - source.close().subscribe(); + source.dispose(); } }); } @@ -143,24 +143,18 @@ public Mono metadataPush(Payload payload) { .doFinally( signalType -> { if (count.decrementAndGet() == 0 && closed.get()) { - source.close().subscribe(); + source.dispose(); } }); } @Override - public Mono close() { - return Mono.defer( - () -> { - if (closed.compareAndSet(false, true)) { - if (count.get() == 0) { - return source.close(); - } else { - return source.onClose(); - } - } - return source.onClose(); - }); + public void dispose() { + if (closed.compareAndSet(false, true)) { + if (count.get() == 0) { + source.dispose(); + } + } } }; } diff --git a/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java b/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java index a4ca079d5..61ee84c27 100644 --- a/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java +++ b/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java @@ -29,7 +29,7 @@ public class TestingRSocket implements RSocket { private final AtomicInteger count; - private final MonoProcessor closeSubject = MonoProcessor.create(); + private final MonoProcessor onClose = MonoProcessor.create(); private final BiFunction, Payload, Boolean> eachPayloadHandler; public TestingRSocket(Function responder) { @@ -127,16 +127,17 @@ public double availability() { } @Override - public Mono close() { - return Mono.defer( - () -> { - closeSubject.onComplete(); - return closeSubject; - }); + public void dispose() { + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override public Mono onClose() { - return closeSubject; + return onClose; } } diff --git a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java index 3d93f4607..a6818f554 100644 --- a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java +++ b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java @@ -80,8 +80,13 @@ public Flux receive() { } @Override - public Mono close() { - return connection.close(); + public void dispose() { + connection.dispose(); + } + + @Override + public boolean isDisposed() { + return connection.isDisposed(); } @Override diff --git a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java index 79da95698..1d4e10470 100644 --- a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java +++ b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java @@ -223,8 +223,13 @@ public Mono metadataPush(Payload payload) { } @Override - public Mono close() { - return delegate.close(); + public void dispose() { + delegate.dispose(); + } + + @Override + public boolean isDisposed() { + return delegate.isDisposed(); } @Override diff --git a/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java b/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java index 03c63b791..c9f9e9105 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java +++ b/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java @@ -69,7 +69,7 @@ public void evaluate() throws Throwable { S server = serverInit.apply(address); client = clientConnector.apply(address, server); base.evaluate(); - server.close().block(); + server.dispose(); } }; } diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java index 4fe2a5952..434a12427 100644 --- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java @@ -32,12 +32,12 @@ public class AeronDuplexConnection implements DuplexConnection { private final String name; private final AeronChannel channel; - private final MonoProcessor emptySubject; + private final MonoProcessor onClose; public AeronDuplexConnection(String name, AeronChannel channel) { this.name = name; this.channel = channel; - this.emptySubject = MonoProcessor.create(); + this.onClose = MonoProcessor.create(); } @Override @@ -57,27 +57,23 @@ public Flux receive() { } @Override - public double availability() { - return channel.isActive() ? 1.0 : 0.0; + public void dispose() { + try { + channel.dispose(); + onClose.onComplete(); + } catch (Exception e) { + onClose.onError(e); + } } @Override - public Mono close() { - return Mono.defer( - () -> { - try { - channel.close(); - emptySubject.onComplete(); - } catch (Exception e) { - emptySubject.onError(e); - } - return emptySubject; - }); + public boolean isDisposed() { + return channel.isDisposed(); } @Override public Mono onClose() { - return emptySubject; + return onClose; } @Override @@ -88,8 +84,8 @@ public String toString() { + '\'' + ", channel=" + channel - + ", emptySubject=" - + emptySubject + + ", onClose=" + + onClose + '}'; } } diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java index 4dff523bd..ee4dd351b 100644 --- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java @@ -18,14 +18,14 @@ import io.aeron.Publication; import io.aeron.Subscription; import io.rsocket.aeron.internal.EventLoop; -import java.io.IOException; import java.util.Objects; import org.agrona.DirectBuffer; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** */ -public class AeronChannel implements ReactiveStreamsRemote.Channel, AutoCloseable { +public class AeronChannel implements ReactiveStreamsRemote.Channel, Disposable { private final String name; private final Publication destination; private final Subscription source; @@ -78,22 +78,18 @@ public Flux receive() { } @Override - public void close() throws IOException { - try { - destination.close(); - source.close(); - } catch (Throwable t) { - throw new IOException(t); - } + public void dispose() { + destination.close(); + source.close(); } @Override - public String toString() { - return "AeronChannel{" + "name='" + name + '\'' + '}'; + public boolean isDisposed() { + return destination.isClosed() && source.isClosed(); } @Override - public boolean isActive() { - return !destination.isClosed() && !source.isClosed(); + public String toString() { + return "AeronChannel{" + "name='" + name + '\'' + '}'; } } diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java index 67ec5700b..6f51e99ab 100644 --- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java @@ -231,8 +231,6 @@ public interface AeronChannelConsumer public class AeronChannelStartedServer implements ReactiveStreamsRemote.StartedServer, Closeable { private final MonoProcessor onClose = MonoProcessor.create(); - private CountDownLatch latch = new CountDownLatch(1); - public AeronWrapper getAeronWrapper() { return aeronWrapper; } @@ -254,27 +252,29 @@ public int getServerPort() { @Override public void awaitShutdown(long duration, TimeUnit durationUnit) { Duration d = Duration.ofMillis(durationUnit.toMillis(duration)); - close().block(d); + onClose().block(d); } @Override public void awaitShutdown() { - close().block(); + onClose().block(); } @Override public void shutdown() { - close().subscribe(); + dispose(); } @Override - public Mono close() { - return Mono.defer( - () -> { - running = false; - managementSubscription.close(); - return onClose; - }); + public void dispose() { + running = false; + managementSubscription.close(); + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java index 2d030c8c5..5314be730 100644 --- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java @@ -34,8 +34,6 @@ default Mono send(T t) { } Flux receive(); - - boolean isActive(); } interface ClientChannelConnector> diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index dffdafe7e..a82f79f0b 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -27,13 +27,13 @@ public class LocalDuplexConnection implements DuplexConnection { private final Flux in; private final Subscriber out; - private final MonoProcessor closeNotifier; + private final MonoProcessor onClose; public LocalDuplexConnection( - Flux in, Subscriber out, MonoProcessor closeNotifier) { + Flux in, Subscriber out, MonoProcessor onClose) { this.in = in; this.out = out; - this.closeNotifier = closeNotifier; + this.onClose = onClose; } @Override @@ -52,22 +52,18 @@ public Flux receive() { } @Override - public Mono close() { - return Mono.defer( - () -> { - out.onComplete(); - closeNotifier.onComplete(); - return closeNotifier; - }); + public void dispose() { + out.onComplete(); + onClose.onComplete(); } @Override - public Mono onClose() { - return closeNotifier; + public boolean isDisposed() { + return onClose.isDisposed(); } @Override - public double availability() { - return closeNotifier.isDisposed() ? 0.0 : 1.0; + public Mono onClose() { + return onClose; } } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java index 55976b1c7..c94e1fde2 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java @@ -82,7 +82,7 @@ public String getName() { static class ServerDuplexConnectionAcceptor implements Consumer, Closeable { private final LocalSocketAddress address; private final ConnectionAcceptor acceptor; - private final MonoProcessor closeNotifier = MonoProcessor.create(); + private final MonoProcessor onClose = MonoProcessor.create(); public ServerDuplexConnectionAcceptor(String name, ConnectionAcceptor acceptor) { this.address = new LocalSocketAddress(name); @@ -95,21 +95,22 @@ public void accept(DuplexConnection duplexConnection) { } @Override - public Mono close() { - return Mono.defer( - () -> { - if (!registry.remove(address.getName(), this)) { - throw new AssertionError(); - } - - closeNotifier.onComplete(); - return Mono.empty(); - }); + public void dispose() { + if (!registry.remove(address.getName(), this)) { + throw new AssertionError(); + } + + onClose.onComplete(); + } + + @Override + public boolean isDisposed() { + return onClose.isDisposed(); } @Override public Mono onClose() { - return closeNotifier; + return onClose; } } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java index bed01967b..22d27d17f 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java @@ -63,17 +63,17 @@ public Flux receive() { } @Override - public Mono close() { - return Mono.fromRunnable(onClose::onComplete); + public void dispose() { + onClose.onComplete(); } @Override - public Mono onClose() { - return onClose; + public boolean isDisposed() { + return onClose.isDisposed(); } @Override - public double availability() { - return onClose.isTerminated() ? 0.0 : 1.0; + public Mono onClose() { + return onClose; } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 838fadf93..e17567bec 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -86,17 +86,17 @@ public Flux receive() { } @Override - public Mono close() { - return Mono.fromRunnable(onClose::onComplete); + public void dispose() { + onClose.onComplete(); } @Override - public Mono onClose() { - return onClose; + public boolean isDisposed() { + return onClose.isDisposed(); } @Override - public double availability() { - return onClose.isTerminated() ? 0.0 : 1.0; + public Mono onClose() { + return onClose; } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java index 620dd4c7a..0fb1b0a83 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java @@ -19,7 +19,6 @@ import io.rsocket.Closeable; import java.net.InetSocketAddress; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyContext; /** @@ -28,27 +27,23 @@ public class NettyContextCloseable implements Closeable { private NettyContext context; - private MonoProcessor onClose; - NettyContextCloseable(NettyContext context) { - this.onClose = MonoProcessor.create(); this.context = context; } @Override - public Mono close() { - return Mono.empty() - .doFinally( - s -> { - context.dispose(); - onClose.onComplete(); - }) - .then(); + public void dispose() { + context.dispose(); + } + + @Override + public boolean isDisposed() { + return context.isDisposed(); } @Override public Mono onClose() { - return onClose; + return context.onClose(); } /** diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 8d5db9bcf..277cb5f8b 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -3,35 +3,37 @@ import io.rsocket.Closeable; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.netty.WebsocketDuplexConnection; -import io.rsocket.util.CloseableAdapter; import java.util.function.BiFunction; +import java.util.function.Consumer; + import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.ipc.netty.http.server.HttpServer; import reactor.ipc.netty.http.server.HttpServerRoutes; import reactor.ipc.netty.http.websocket.WebsocketInbound; import reactor.ipc.netty.http.websocket.WebsocketOutbound; public class WebsocketRouteTransport implements ServerTransport { - private HttpServerRoutes routes; + private HttpServer server; + private Consumer routesBuilder; private String path; - public WebsocketRouteTransport(HttpServerRoutes routes, String path) { - this.routes = routes; + public WebsocketRouteTransport(HttpServer server, + Consumer routesBuilder, + String path) { + this.server = server; + this.routesBuilder = routesBuilder; this.path = path; } @Override public Mono start(ConnectionAcceptor acceptor) { - return Mono.defer( - () -> { + return server + .newRouter(routes -> { + routesBuilder.accept(routes); routes.ws(path, newHandler(acceptor)); - - return Mono.just( - new CloseableAdapter( - () -> { - // TODO close route somehow - })); - }); + }) + .map(NettyContextCloseable::new); } public static BiFunction> newHandler( diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java index c208a3ee5..919223359 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java @@ -48,7 +48,7 @@ public static WebsocketServerTransport create(HttpServer server) { } @Override - public Mono start(ServerTransport.ConnectionAcceptor acceptor) { + public Mono start(ConnectionAcceptor acceptor) { return server .newHandler( (request, response) -> {