close() {
- return Mono.empty();
+ public void dispose() {
}
@Override
diff --git a/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java b/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
index 93c8cd57c..674eb0d0f 100644
--- a/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
+++ b/rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
@@ -24,9 +24,6 @@
/**
* An abstract implementation of {@link RSocket}. All request handling methods emit {@link
* UnsupportedOperationException} and hence must be overridden to provide a valid implementation.
- *
- * {@link #close()} returns a {@code Publisher} that immediately terminates. That same Publisher
- * is returned by the {@link #onClose()} method.
*/
public abstract class AbstractRSocket implements RSocket {
@@ -58,12 +55,13 @@ public Mono metadataPush(Payload payload) {
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- onClose.onComplete();
- return onClose;
- });
+ public void dispose() {
+ onClose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
diff --git a/rsocket-core/src/main/java/io/rsocket/Closeable.java b/rsocket-core/src/main/java/io/rsocket/Closeable.java
index ee2f177ff..9eab77e30 100644
--- a/rsocket-core/src/main/java/io/rsocket/Closeable.java
+++ b/rsocket-core/src/main/java/io/rsocket/Closeable.java
@@ -16,24 +16,14 @@
package io.rsocket;
+import reactor.core.Disposable;
import reactor.core.publisher.Mono;
/** */
-public interface Closeable {
- /**
- * Close this {@code RSocket} upon subscribing to the returned {@code Publisher}
- *
- * This method is idempotent and hence can be called as many times at any point with same
- * outcome.
- *
- * @return A {@code Publisher} that triggers the close when subscribed to and that completes when
- * this {@code RSocket} close is complete.
- */
- Mono close();
-
+public interface Closeable extends Disposable {
/**
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
- * RSocket} can be closed by explicitly calling {@link #close()} or when the underlying transport
+ * RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying transport
* connection is closed.
*
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
index 0dea9d9d2..3113d20f5 100644
--- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
+++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
@@ -74,4 +74,9 @@ default Mono sendOne(Frame frame) {
* @return Stream of all {@code Frame}s received.
*/
Flux receive();
+
+ @Override
+ default double availability() {
+ return isDisposed() ? 0.0 : 1.0;
+ }
}
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java
index 0f006d5ca..725aa9c92 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocket.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java
@@ -71,6 +71,6 @@ public interface RSocket extends Availability, Closeable {
@Override
default double availability() {
- return 0.0;
+ return isDisposed() ? 0.0 : 1.0;
}
}
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
index 4713c52d1..07b8879fd 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java
@@ -98,7 +98,7 @@ class RSocketClient implements RSocket {
.doOnError(
t -> {
errorConsumer.accept(t);
- connection.close().subscribe();
+ connection.dispose();
})
.subscribe();
}
@@ -234,8 +234,13 @@ public double availability() {
}
@Override
- public Mono close() {
- return connection.close();
+ public void dispose() {
+ connection.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return connection.isDisposed();
}
@Override
@@ -260,25 +265,25 @@ public Flux handleRequestStream(final Payload payload) {
return receiver
.doOnRequest(
l -> {
- if (first.compareAndSet(false, true) && !receiver.isTerminated()) {
+ if (first.compareAndSet(false, true) && !receiver.isDisposed()) {
final Frame requestFrame =
Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, l);
payload.release();
sendProcessor.onNext(requestFrame);
- } else if (contains(streamId) && !receiver.isTerminated()) {
+ } else if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
}
sendProcessor.drain();
})
.doOnError(
t -> {
- if (contains(streamId) && !receiver.isTerminated()) {
+ if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
})
.doOnCancel(
() -> {
- if (contains(streamId) && !receiver.isTerminated()) {
+ if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
})
@@ -326,7 +331,7 @@ private Flux handleChannel(Flux request, FrameType requestType
boolean firstRequest = true;
boolean isValidToSendFrame() {
- return contains(streamId) && !receiver.isTerminated();
+ return contains(streamId) && !receiver.isDisposed();
}
void sendOneFrame(Frame frame) {
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
index 33f56c349..f69630757 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
@@ -342,7 +342,7 @@ private Mono extends Void> processSetupFrame(
return multiplexer
.asStreamZeroConnection()
.sendOne(Frame.Error.from(0, error))
- .then(multiplexer.close());
+ .doFinally(signalType -> multiplexer.dispose());
}
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java
index 7b959e756..e60afa0b1 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java
@@ -200,8 +200,13 @@ public Mono metadataPush(Payload payload) {
}
@Override
- public Mono close() {
- return connection.close();
+ public void dispose() {
+ connection.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return connection.isDisposed();
}
@Override
@@ -213,7 +218,7 @@ private void cleanup() {
cleanUpSendingSubscriptions();
cleanUpChannelProcessors();
- requestHandler.close().subscribe();
+ requestHandler.dispose();
}
private synchronized void cleanUpSendingSubscriptions() {
diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
index 38c46b62c..30d9a3d8b 100644
--- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
+++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java
@@ -85,20 +85,13 @@ public Flux receive() {
}
@Override
- public Mono close() {
- return source.close();
+ public void dispose() {
+ source.dispose();
}
- private synchronized FrameReassembler getFrameReassembler(Frame frame) {
- return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame));
- }
-
- private synchronized FrameReassembler removeFrameReassembler(int streamId) {
- return frameReassemblers.remove(streamId);
- }
-
- private synchronized boolean frameReassemblersContain(int streamId) {
- return frameReassemblers.containsKey(streamId);
+ @Override
+ public boolean isDisposed() {
+ return source.isDisposed();
}
@Override
@@ -114,4 +107,16 @@ public Mono onClose() {
}
});
}
+
+ private synchronized FrameReassembler getFrameReassembler(Frame frame) {
+ return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame));
+ }
+
+ private synchronized FrameReassembler removeFrameReassembler(int streamId) {
+ return frameReassemblers.remove(streamId);
+ }
+
+ private synchronized boolean frameReassemblersContain(int streamId) {
+ return frameReassemblers.containsKey(streamId);
+ }
}
diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java
index 861a8246e..0f9209462 100644
--- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java
+++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java
@@ -16,6 +16,7 @@
package io.rsocket.internal;
+import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.FrameType;
@@ -41,7 +42,7 @@
* even. Even IDs are for the streams initiated by server and odds are for streams initiated by the
* client.
*/
-public class ClientServerInputMultiplexer {
+public class ClientServerInputMultiplexer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
private final DuplexConnection streamZeroConnection;
@@ -112,8 +113,19 @@ public DuplexConnection asStreamZeroConnection() {
return streamZeroConnection;
}
- public Mono close() {
- return source.close();
+ @Override
+ public void dispose() {
+ source.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return source.isDisposed();
+ }
+
+ @Override
+ public Mono onClose() {
+ return source.onClose();
}
private static class InternalDuplexConnection implements DuplexConnection {
@@ -158,8 +170,13 @@ public Flux receive() {
}
@Override
- public Mono close() {
- return source.close();
+ public void dispose() {
+ source.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return source.isDisposed();
}
@Override
diff --git a/rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java b/rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java
deleted file mode 100644
index a4efce8b5..000000000
--- a/rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package io.rsocket.util;
-
-import io.rsocket.Closeable;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoProcessor;
-
-public class CloseableAdapter implements Closeable {
- private final MonoProcessor onClose = MonoProcessor.create();
- private Runnable closeFunction;
-
- public CloseableAdapter(Runnable closeFunction) {
- this.closeFunction = closeFunction;
- }
-
- @Override
- public Mono close() {
- return Mono.defer(
- () -> {
- closeFunction.run();
- onClose.onComplete();
- return onClose;
- });
- }
-
- @Override
- public Mono onClose() {
- return onClose;
- }
-}
diff --git a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java
index 29aefa742..ef757fbc6 100644
--- a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java
+++ b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java
@@ -60,8 +60,13 @@ public double availability() {
}
@Override
- public Mono close() {
- return source.close();
+ public void dispose() {
+ source.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return source.isDisposed();
}
@Override
diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java
index 3e8a0e551..46b4ee268 100644
--- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java
+++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java
@@ -27,7 +27,7 @@
public class LocalDuplexConnection implements DuplexConnection {
private final DirectProcessor send;
private final DirectProcessor receive;
- private final MonoProcessor closeNotifier;
+ private final MonoProcessor onClose;
private final String name;
public LocalDuplexConnection(
@@ -35,7 +35,7 @@ public LocalDuplexConnection(
this.name = name;
this.send = send;
this.receive = receive;
- closeNotifier = MonoProcessor.create();
+ onClose = MonoProcessor.create();
}
@Override
@@ -53,21 +53,17 @@ public Flux receive() {
}
@Override
- public double availability() {
- return 1;
+ public void dispose() {
+ onClose.onComplete();
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- closeNotifier.onComplete();
- return Mono.empty();
- });
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
public Mono onClose() {
- return closeNotifier;
+ return onClose;
}
}
diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java
index e0ee1713c..b62d61b09 100644
--- a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java
+++ b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java
@@ -77,8 +77,13 @@ public double availability() {
}
@Override
- public Mono close() {
- return delegate.close();
+ public void dispose() {
+ delegate.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return delegate.isDisposed();
}
@Override
diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java
index 357ffda69..55b29a6d9 100644
--- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java
+++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java
@@ -41,7 +41,7 @@ public class TestDuplexConnection implements DuplexConnection {
private final LinkedBlockingQueue sent;
private final DirectProcessor sentPublisher;
private final DirectProcessor received;
- private final MonoProcessor close;
+ private final MonoProcessor onClose;
private final ConcurrentLinkedQueue> sendSubscribers;
private volatile double availability = 1;
private volatile int initialSendRequestN = Integer.MAX_VALUE;
@@ -51,7 +51,7 @@ public TestDuplexConnection() {
received = DirectProcessor.create();
sentPublisher = DirectProcessor.create();
sendSubscribers = new ConcurrentLinkedQueue<>();
- close = MonoProcessor.create();
+ onClose = MonoProcessor.create();
}
@Override
@@ -84,13 +84,18 @@ public double availability() {
}
@Override
- public Mono close() {
- return close;
+ public void dispose() {
+ onClose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
public Mono onClose() {
- return close();
+ return onClose;
}
public Frame awaitSend() throws InterruptedException {
diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java
index 1b6077c87..049b3c3a5 100644
--- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java
+++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java
@@ -48,7 +48,8 @@ public static void main(String[] args) {
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.take(10)
- .thenEmpty(socket.close())
+ .doFinally(signalType -> socket.dispose())
+ .then()
.block();
}
diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java
index 3282a72e5..7941c05df 100644
--- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java
+++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java
@@ -76,6 +76,6 @@ public Mono requestResponse(Payload p) {
.doOnNext(System.out::println)
.block();
- socket.close().block();
+ socket.dispose();
}
}
diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java
index b33ae5626..6090e61c2 100644
--- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java
+++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java
@@ -41,7 +41,9 @@ public static void main(String[] args) {
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.take(10)
- .thenEmpty(socket.close())
+ .then()
+ .doFinally(signalType -> socket.dispose())
+ .then()
.block();
}
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java
index d6d0741a6..770018593 100644
--- a/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java
@@ -147,7 +147,7 @@ public Flux requestChannel(Publisher payloads) {
@After
public void teardown() {
- server.close().block();
+ server.dispose();
}
@Test(timeout = 5_000L)
@@ -170,7 +170,7 @@ public void testStream() {
@Test(timeout = 5_000L)
public void testClose() throws InterruptedException {
- client.close().block();
+ client.dispose();
disconnectionCounter.await();
}
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java
index 3c3dacd37..44cad1d1a 100644
--- a/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java
@@ -64,7 +64,7 @@ private RSocket buildClient() {
@After
public void cleanup() {
- server.close().block();
+ server.dispose();
}
@Test(timeout = 5_000L)
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java
index aa95272d1..8d30a406a 100644
--- a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java
@@ -60,7 +60,7 @@ public Flux requestStream(Payload payload) {
System.out.println("here");
} finally {
- server.close().block();
+ server.dispose();
}
}
@@ -98,7 +98,7 @@ public Flux requestStream(Payload payload) {
System.out.println("here");
} finally {
- server.close().block();
+ server.dispose();
}
}
@@ -149,7 +149,7 @@ public Flux requestStream(Payload payload) {
consumer("1").blockLast();
} finally {
- server.close().block();
+ server.dispose();
}
}
diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java
index 76dd15ad2..4f5649f24 100644
--- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java
+++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java
@@ -90,7 +90,6 @@ public abstract class LoadBalancedRSocketMono extends Mono
private final ArrayList activeSockets;
private final ArrayList activeFactories;
private final FactoriesRefresher factoryRefresher;
- private final Mono selectSocket;
private final Ewma pendings;
private volatile int targetAperture;
@@ -137,7 +136,6 @@ private LoadBalancedRSocketMono(
this.activeFactories = new ArrayList<>();
this.pendingSockets = 0;
this.factoryRefresher = new FactoriesRefresher();
- this.selectSocket = Mono.fromCallable(this::select);
this.minPendings = minPendings;
this.maxPendings = maxPendings;
@@ -154,12 +152,7 @@ private LoadBalancedRSocketMono(
factories.subscribe(factoryRefresher);
- rSocketMono =
- Mono.create(
- sink -> {
- RSocket rSocket = select();
- sink.success(rSocket);
- });
+ rSocketMono = Mono.fromCallable(this::select);
}
public static LoadBalancedRSocketMono create(
@@ -198,7 +191,7 @@ public static LoadBalancedRSocketMono create(
maxRefreshPeriodMs) {
@Override
public void subscribe(CoreSubscriber super RSocket> s) {
- started.thenMany(rSocketMono).subscribe(s);
+ started.then(rSocketMono).subscribe(s);
}
};
}
@@ -373,7 +366,7 @@ private synchronized void removeSocket(WeightedSocket socket, boolean refresh) {
logger.debug("Removing socket: -> " + socket);
activeSockets.remove(socket);
activeFactories.add(socket.getFactory());
- socket.close().subscribe();
+ socket.dispose();
if (refresh) {
refreshSockets();
}
@@ -481,50 +474,24 @@ public synchronized String toString() {
}
@Override
- public Mono onClose() {
- return onClose;
+ public void dispose() {
+ synchronized (this) {
+ factoryRefresher.close();
+ activeFactories.clear();
+ activeSockets.forEach(WeightedSocket::dispose);
+ activeSockets.clear();
+ onClose.onComplete();
+ }
}
@Override
- public Mono close() {
- return Mono.from(
- subscriber -> {
- subscriber.onSubscribe(Operators.emptySubscription());
-
- synchronized (this) {
- factoryRefresher.close();
- activeFactories.clear();
- AtomicInteger n = new AtomicInteger(activeSockets.size());
-
- activeSockets.forEach(
- rs ->
- rs.close()
- .subscribe(
- new Subscriber() {
- @Override
- public void onSubscribe(Subscription s) {
- s.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Void aVoid) {}
-
- @Override
- public void onError(Throwable t) {
- logger.warn("Exception while closing a RSocket", t);
- onComplete();
- }
-
- @Override
- public void onComplete() {
- if (n.decrementAndGet() == 0) {
- subscriber.onComplete();
- onClose.onComplete();
- }
- }
- }));
- }
- });
+ public boolean isDisposed() {
+ return onClose.isDisposed();
+ }
+
+ @Override
+ public Mono onClose() {
+ return onClose;
}
/**
@@ -564,7 +531,7 @@ public void onNext(Collection newFactories) {
it0.remove();
try {
changed = true;
- socket.close();
+ socket.dispose();
} catch (Exception e) {
logger.warn("Exception while closing a RSocket", e);
}
@@ -712,8 +679,12 @@ public double availability() {
}
@Override
- public Mono close() {
- return Mono.empty();
+ public void dispose() {
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return true;
}
@Override
@@ -880,8 +851,13 @@ private synchronized void observe(double rtt) {
}
@Override
- public Mono close() {
- return source.close();
+ public void dispose() {
+ source.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return source.isDisposed();
}
@Override
diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java
index 745ded482..ebe4ef7c8 100644
--- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java
+++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java
@@ -88,8 +88,13 @@ public double availability() {
}
@Override
- public Mono close() {
- return child.close();
+ public void dispose() {
+ child.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return child.isDisposed();
}
@Override
diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java
index 29617188b..5fd66dc83 100644
--- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java
+++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java
@@ -87,8 +87,13 @@ public Mono get() {
}
@Override
- public Mono close() {
- return Mono.empty().doFinally(s -> onClose.onComplete()).then();
+ public void dispose() {
+ onClose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
@@ -100,7 +105,7 @@ private class AvailabilityAwareRSocketProxy extends RSocketProxy {
public AvailabilityAwareRSocketProxy(RSocket source) {
super(source);
- onClose.then(close()).subscribe();
+ onClose.doFinally(signalType -> source.dispose()).subscribe();
}
@Override
diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java
index 76041bca1..7da494d26 100644
--- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java
+++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSockets.java
@@ -72,7 +72,7 @@ public Mono metadataPush(Payload payload) {
/**
* Provides a mapping function to wrap a {@code RSocket} such that a call to {@link
- * RSocket#close()} does not cancel all pending requests. Instead, it will wait for all pending
+ * RSocket#dispose()} does not cancel all pending requests. Instead, it will wait for all pending
* requests to finish and then close the socket.
*
* @return Function to transform any socket into a safe closing socket.
@@ -91,7 +91,7 @@ public Mono fireAndForget(Payload payload) {
.doFinally(
signalType -> {
if (count.decrementAndGet() == 0 && closed.get()) {
- source.close().subscribe();
+ source.dispose();
}
});
}
@@ -104,7 +104,7 @@ public Mono requestResponse(Payload payload) {
.doFinally(
signalType -> {
if (count.decrementAndGet() == 0 && closed.get()) {
- source.close().subscribe();
+ source.dispose();
}
});
}
@@ -117,7 +117,7 @@ public Flux requestStream(Payload payload) {
.doFinally(
signalType -> {
if (count.decrementAndGet() == 0 && closed.get()) {
- source.close().subscribe();
+ source.dispose();
}
});
}
@@ -130,7 +130,7 @@ public Flux requestChannel(Publisher payloads) {
.doFinally(
signalType -> {
if (count.decrementAndGet() == 0 && closed.get()) {
- source.close().subscribe();
+ source.dispose();
}
});
}
@@ -143,24 +143,18 @@ public Mono metadataPush(Payload payload) {
.doFinally(
signalType -> {
if (count.decrementAndGet() == 0 && closed.get()) {
- source.close().subscribe();
+ source.dispose();
}
});
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- if (closed.compareAndSet(false, true)) {
- if (count.get() == 0) {
- return source.close();
- } else {
- return source.onClose();
- }
- }
- return source.onClose();
- });
+ public void dispose() {
+ if (closed.compareAndSet(false, true)) {
+ if (count.get() == 0) {
+ source.dispose();
+ }
+ }
}
};
}
diff --git a/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java b/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java
index a4ca079d5..61ee84c27 100644
--- a/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java
+++ b/rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java
@@ -29,7 +29,7 @@
public class TestingRSocket implements RSocket {
private final AtomicInteger count;
- private final MonoProcessor closeSubject = MonoProcessor.create();
+ private final MonoProcessor onClose = MonoProcessor.create();
private final BiFunction, Payload, Boolean> eachPayloadHandler;
public TestingRSocket(Function responder) {
@@ -127,16 +127,17 @@ public double availability() {
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- closeSubject.onComplete();
- return closeSubject;
- });
+ public void dispose() {
+ onClose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
public Mono onClose() {
- return closeSubject;
+ return onClose;
}
}
diff --git a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java
index 3d93f4607..a6818f554 100644
--- a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java
+++ b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorFrameInterceptor.java
@@ -80,8 +80,13 @@ public Flux receive() {
}
@Override
- public Mono close() {
- return connection.close();
+ public void dispose() {
+ connection.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return connection.isDisposed();
}
@Override
diff --git a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java
index 79da95698..1d4e10470 100644
--- a/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java
+++ b/rsocket-spectator/src/main/java/io/rsocket/spectator/SpectatorRSocket.java
@@ -223,8 +223,13 @@ public Mono metadataPush(Payload payload) {
}
@Override
- public Mono close() {
- return delegate.close();
+ public void dispose() {
+ delegate.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return delegate.isDisposed();
}
@Override
diff --git a/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java b/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java
index 03c63b791..c9f9e9105 100644
--- a/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java
+++ b/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java
@@ -69,7 +69,7 @@ public void evaluate() throws Throwable {
S server = serverInit.apply(address);
client = clientConnector.apply(address, server);
base.evaluate();
- server.close().block();
+ server.dispose();
}
};
}
diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java
index 4fe2a5952..434a12427 100644
--- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java
+++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/AeronDuplexConnection.java
@@ -32,12 +32,12 @@
public class AeronDuplexConnection implements DuplexConnection {
private final String name;
private final AeronChannel channel;
- private final MonoProcessor emptySubject;
+ private final MonoProcessor onClose;
public AeronDuplexConnection(String name, AeronChannel channel) {
this.name = name;
this.channel = channel;
- this.emptySubject = MonoProcessor.create();
+ this.onClose = MonoProcessor.create();
}
@Override
@@ -57,27 +57,23 @@ public Flux receive() {
}
@Override
- public double availability() {
- return channel.isActive() ? 1.0 : 0.0;
+ public void dispose() {
+ try {
+ channel.dispose();
+ onClose.onComplete();
+ } catch (Exception e) {
+ onClose.onError(e);
+ }
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- try {
- channel.close();
- emptySubject.onComplete();
- } catch (Exception e) {
- emptySubject.onError(e);
- }
- return emptySubject;
- });
+ public boolean isDisposed() {
+ return channel.isDisposed();
}
@Override
public Mono onClose() {
- return emptySubject;
+ return onClose;
}
@Override
@@ -88,8 +84,8 @@ public String toString() {
+ '\''
+ ", channel="
+ channel
- + ", emptySubject="
- + emptySubject
+ + ", onClose="
+ + onClose
+ '}';
}
}
diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java
index 4dff523bd..ee4dd351b 100644
--- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java
+++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannel.java
@@ -18,14 +18,14 @@
import io.aeron.Publication;
import io.aeron.Subscription;
import io.rsocket.aeron.internal.EventLoop;
-import java.io.IOException;
import java.util.Objects;
import org.agrona.DirectBuffer;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/** */
-public class AeronChannel implements ReactiveStreamsRemote.Channel, AutoCloseable {
+public class AeronChannel implements ReactiveStreamsRemote.Channel, Disposable {
private final String name;
private final Publication destination;
private final Subscription source;
@@ -78,22 +78,18 @@ public Flux extends DirectBuffer> receive() {
}
@Override
- public void close() throws IOException {
- try {
- destination.close();
- source.close();
- } catch (Throwable t) {
- throw new IOException(t);
- }
+ public void dispose() {
+ destination.close();
+ source.close();
}
@Override
- public String toString() {
- return "AeronChannel{" + "name='" + name + '\'' + '}';
+ public boolean isDisposed() {
+ return destination.isClosed() && source.isClosed();
}
@Override
- public boolean isActive() {
- return !destination.isClosed() && !source.isClosed();
+ public String toString() {
+ return "AeronChannel{" + "name='" + name + '\'' + '}';
}
}
diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java
index 67ec5700b..6f51e99ab 100644
--- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java
+++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronChannelServer.java
@@ -231,8 +231,6 @@ public interface AeronChannelConsumer
public class AeronChannelStartedServer implements ReactiveStreamsRemote.StartedServer, Closeable {
private final MonoProcessor onClose = MonoProcessor.create();
- private CountDownLatch latch = new CountDownLatch(1);
-
public AeronWrapper getAeronWrapper() {
return aeronWrapper;
}
@@ -254,27 +252,29 @@ public int getServerPort() {
@Override
public void awaitShutdown(long duration, TimeUnit durationUnit) {
Duration d = Duration.ofMillis(durationUnit.toMillis(duration));
- close().block(d);
+ onClose().block(d);
}
@Override
public void awaitShutdown() {
- close().block();
+ onClose().block();
}
@Override
public void shutdown() {
- close().subscribe();
+ dispose();
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- running = false;
- managementSubscription.close();
- return onClose;
- });
+ public void dispose() {
+ running = false;
+ managementSubscription.close();
+ onClose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java
index 2d030c8c5..5314be730 100644
--- a/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java
+++ b/rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java
@@ -34,8 +34,6 @@ default Mono send(T t) {
}
Flux extends T> receive();
-
- boolean isActive();
}
interface ClientChannelConnector>
diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java
index dffdafe7e..a82f79f0b 100644
--- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java
+++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java
@@ -27,13 +27,13 @@
public class LocalDuplexConnection implements DuplexConnection {
private final Flux in;
private final Subscriber out;
- private final MonoProcessor closeNotifier;
+ private final MonoProcessor onClose;
public LocalDuplexConnection(
- Flux in, Subscriber out, MonoProcessor closeNotifier) {
+ Flux in, Subscriber out, MonoProcessor onClose) {
this.in = in;
this.out = out;
- this.closeNotifier = closeNotifier;
+ this.onClose = onClose;
}
@Override
@@ -52,22 +52,18 @@ public Flux receive() {
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- out.onComplete();
- closeNotifier.onComplete();
- return closeNotifier;
- });
+ public void dispose() {
+ out.onComplete();
+ onClose.onComplete();
}
@Override
- public Mono onClose() {
- return closeNotifier;
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
- public double availability() {
- return closeNotifier.isDisposed() ? 0.0 : 1.0;
+ public Mono onClose() {
+ return onClose;
}
}
diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java
index 55976b1c7..c94e1fde2 100644
--- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java
+++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java
@@ -82,7 +82,7 @@ public String getName() {
static class ServerDuplexConnectionAcceptor implements Consumer, Closeable {
private final LocalSocketAddress address;
private final ConnectionAcceptor acceptor;
- private final MonoProcessor closeNotifier = MonoProcessor.create();
+ private final MonoProcessor onClose = MonoProcessor.create();
public ServerDuplexConnectionAcceptor(String name, ConnectionAcceptor acceptor) {
this.address = new LocalSocketAddress(name);
@@ -95,21 +95,22 @@ public void accept(DuplexConnection duplexConnection) {
}
@Override
- public Mono close() {
- return Mono.defer(
- () -> {
- if (!registry.remove(address.getName(), this)) {
- throw new AssertionError();
- }
-
- closeNotifier.onComplete();
- return Mono.empty();
- });
+ public void dispose() {
+ if (!registry.remove(address.getName(), this)) {
+ throw new AssertionError();
+ }
+
+ onClose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
public Mono onClose() {
- return closeNotifier;
+ return onClose;
}
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java
index bed01967b..22d27d17f 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java
@@ -63,17 +63,17 @@ public Flux receive() {
}
@Override
- public Mono close() {
- return Mono.fromRunnable(onClose::onComplete);
+ public void dispose() {
+ onClose.onComplete();
}
@Override
- public Mono onClose() {
- return onClose;
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
- public double availability() {
- return onClose.isTerminated() ? 0.0 : 1.0;
+ public Mono onClose() {
+ return onClose;
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
index 838fadf93..e17567bec 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java
@@ -86,17 +86,17 @@ public Flux receive() {
}
@Override
- public Mono close() {
- return Mono.fromRunnable(onClose::onComplete);
+ public void dispose() {
+ onClose.onComplete();
}
@Override
- public Mono onClose() {
- return onClose;
+ public boolean isDisposed() {
+ return onClose.isDisposed();
}
@Override
- public double availability() {
- return onClose.isTerminated() ? 0.0 : 1.0;
+ public Mono onClose() {
+ return onClose;
}
}
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java
index 620dd4c7a..0fb1b0a83 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/NettyContextCloseable.java
@@ -19,7 +19,6 @@
import io.rsocket.Closeable;
import java.net.InetSocketAddress;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.NettyContext;
/**
@@ -28,27 +27,23 @@
public class NettyContextCloseable implements Closeable {
private NettyContext context;
- private MonoProcessor onClose;
-
NettyContextCloseable(NettyContext context) {
- this.onClose = MonoProcessor.create();
this.context = context;
}
@Override
- public Mono close() {
- return Mono.empty()
- .doFinally(
- s -> {
- context.dispose();
- onClose.onComplete();
- })
- .then();
+ public void dispose() {
+ context.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return context.isDisposed();
}
@Override
public Mono onClose() {
- return onClose;
+ return context.onClose();
}
/**
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java
index 8d5db9bcf..277cb5f8b 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java
@@ -3,35 +3,37 @@
import io.rsocket.Closeable;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.WebsocketDuplexConnection;
-import io.rsocket.util.CloseableAdapter;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
+import reactor.ipc.netty.http.server.HttpServer;
import reactor.ipc.netty.http.server.HttpServerRoutes;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
public class WebsocketRouteTransport implements ServerTransport {
- private HttpServerRoutes routes;
+ private HttpServer server;
+ private Consumer super HttpServerRoutes> routesBuilder;
private String path;
- public WebsocketRouteTransport(HttpServerRoutes routes, String path) {
- this.routes = routes;
+ public WebsocketRouteTransport(HttpServer server,
+ Consumer super HttpServerRoutes> routesBuilder,
+ String path) {
+ this.server = server;
+ this.routesBuilder = routesBuilder;
this.path = path;
}
@Override
public Mono start(ConnectionAcceptor acceptor) {
- return Mono.defer(
- () -> {
+ return server
+ .newRouter(routes -> {
+ routesBuilder.accept(routes);
routes.ws(path, newHandler(acceptor));
-
- return Mono.just(
- new CloseableAdapter(
- () -> {
- // TODO close route somehow
- }));
- });
+ })
+ .map(NettyContextCloseable::new);
}
public static BiFunction> newHandler(
diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java
index c208a3ee5..919223359 100644
--- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java
+++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java
@@ -48,7 +48,7 @@ public static WebsocketServerTransport create(HttpServer server) {
}
@Override
- public Mono start(ServerTransport.ConnectionAcceptor acceptor) {
+ public Mono start(ConnectionAcceptor acceptor) {
return server
.newHandler(
(request, response) -> {