Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static class Input {

static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8));

static final Payload HELLO_PAYLOAD = new DefaultPayload(HELLO);
static final Payload HELLO_PAYLOAD = DefaultPayload.create(HELLO);

static final DirectProcessor<Frame> clientReceive = DirectProcessor.create();
static final DirectProcessor<Frame> serverReceive = DirectProcessor.create();
Expand Down Expand Up @@ -121,8 +121,7 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public Mono<Void> close() {
return Mono.empty();
public void dispose() {
}

@Override
Expand All @@ -140,8 +139,13 @@ public Mono<Void> onClose() {
MonoProcessor<Void> onClose = MonoProcessor.create();

@Override
public Mono<Void> close() {
return Mono.empty().doFinally(s -> onClose.onComplete()).then();
public void dispose() {
onClose.onComplete();
}

@Override
public boolean isDisposed() {
return onClose.isDisposed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public double availability() {
}

@Override
public Mono<Void> close() {
return Mono.empty();
public void dispose() {
}

@Override
Expand Down
16 changes: 7 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
/**
* An abstract implementation of {@link RSocket}. All request handling methods emit {@link
* UnsupportedOperationException} and hence must be overridden to provide a valid implementation.
*
* <p>{@link #close()} returns a {@code Publisher} that immediately terminates. That same Publisher
* is returned by the {@link #onClose()} method.
*/
public abstract class AbstractRSocket implements RSocket {

Expand Down Expand Up @@ -58,12 +55,13 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public Mono<Void> close() {
return Mono.defer(
() -> {
onClose.onComplete();
return onClose;
});
public void dispose() {
onClose.onComplete();
}

@Override
public boolean isDisposed() {
return onClose.isDisposed();
}

@Override
Expand Down
16 changes: 3 additions & 13 deletions rsocket-core/src/main/java/io/rsocket/Closeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,14 @@

package io.rsocket;

import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/** */
public interface Closeable {
/**
* Close this {@code RSocket} upon subscribing to the returned {@code Publisher}
*
* <p><em>This method is idempotent and hence can be called as many times at any point with same
* outcome.</em>
*
* @return A {@code Publisher} that triggers the close when subscribed to and that completes when
* this {@code RSocket} close is complete.
*/
Mono<Void> close();

public interface Closeable extends Disposable {
/**
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
* RSocket} can be closed by explicitly calling {@link #close()} or when the underlying transport
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying transport
* connection is closed.
*
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
Expand Down
5 changes: 5 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ default Mono<Void> sendOne(Frame frame) {
* @return Stream of all {@code Frame}s received.
*/
Flux<Frame> receive();

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
}
}
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ public interface RSocket extends Availability, Closeable {

@Override
default double availability() {
return 0.0;
return isDisposed() ? 0.0 : 1.0;
}
}
21 changes: 13 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RSocketClient implements RSocket {
.doOnError(
t -> {
errorConsumer.accept(t);
connection.close().subscribe();
connection.dispose();
})
.subscribe();
}
Expand Down Expand Up @@ -234,8 +234,13 @@ public double availability() {
}

@Override
public Mono<Void> close() {
return connection.close();
public void dispose() {
connection.dispose();
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
}

@Override
Expand All @@ -260,25 +265,25 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
return receiver
.doOnRequest(
l -> {
if (first.compareAndSet(false, true) && !receiver.isTerminated()) {
if (first.compareAndSet(false, true) && !receiver.isDisposed()) {
final Frame requestFrame =
Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, l);
payload.release();
sendProcessor.onNext(requestFrame);
} else if (contains(streamId) && !receiver.isTerminated()) {
} else if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.RequestN.from(streamId, l));
}
sendProcessor.drain();
})
.doOnError(
t -> {
if (contains(streamId) && !receiver.isTerminated()) {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
})
.doOnCancel(
() -> {
if (contains(streamId) && !receiver.isTerminated()) {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
})
Expand Down Expand Up @@ -326,7 +331,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request, FrameType requestType
boolean firstRequest = true;

boolean isValidToSendFrame() {
return contains(streamId) && !receiver.isTerminated();
return contains(streamId) && !receiver.isDisposed();
}

void sendOneFrame(Frame frame) {
Expand Down
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Mono<? extends Void> processSetupFrame(
return multiplexer
.asStreamZeroConnection()
.sendOne(Frame.Error.from(0, error))
.then(multiplexer.close());
.doFinally(signalType -> multiplexer.dispose());
}

ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
Expand Down
11 changes: 8 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,13 @@ public Mono<Void> metadataPush(Payload payload) {
}

@Override
public Mono<Void> close() {
return connection.close();
public void dispose() {
connection.dispose();
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
}

@Override
Expand All @@ -213,7 +218,7 @@ private void cleanup() {
cleanUpSendingSubscriptions();
cleanUpChannelProcessors();

requestHandler.close().subscribe();
requestHandler.dispose();
}

private synchronized void cleanUpSendingSubscriptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,13 @@ public Flux<Frame> receive() {
}

@Override
public Mono<Void> close() {
return source.close();
public void dispose() {
source.dispose();
}

private synchronized FrameReassembler getFrameReassembler(Frame frame) {
return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame));
}

private synchronized FrameReassembler removeFrameReassembler(int streamId) {
return frameReassemblers.remove(streamId);
}

private synchronized boolean frameReassemblersContain(int streamId) {
return frameReassemblers.containsKey(streamId);
@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
Expand All @@ -114,4 +107,16 @@ public Mono<Void> onClose() {
}
});
}

private synchronized FrameReassembler getFrameReassembler(Frame frame) {
return frameReassemblers.computeIfAbsent(frame.getStreamId(), s -> new FrameReassembler(frame));
}

private synchronized FrameReassembler removeFrameReassembler(int streamId) {
return frameReassemblers.remove(streamId);
}

private synchronized boolean frameReassemblersContain(int streamId) {
return frameReassemblers.containsKey(streamId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.rsocket.internal;

import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.FrameType;
Expand All @@ -41,7 +42,7 @@
* even. Even IDs are for the streams initiated by server and odds are for streams initiated by the
* client.
*/
public class ClientServerInputMultiplexer {
public class ClientServerInputMultiplexer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");

private final DuplexConnection streamZeroConnection;
Expand Down Expand Up @@ -112,8 +113,19 @@ public DuplexConnection asStreamZeroConnection() {
return streamZeroConnection;
}

public Mono<Void> close() {
return source.close();
@Override
public void dispose() {
source.dispose();
}

@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
public Mono<Void> onClose() {
return source.onClose();
}

private static class InternalDuplexConnection implements DuplexConnection {
Expand Down Expand Up @@ -158,8 +170,13 @@ public Flux<Frame> receive() {
}

@Override
public Mono<Void> close() {
return source.close();
public void dispose() {
source.dispose();
}

@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
Expand Down
29 changes: 0 additions & 29 deletions rsocket-core/src/main/java/io/rsocket/util/CloseableAdapter.java

This file was deleted.

9 changes: 7 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ public double availability() {
}

@Override
public Mono<Void> close() {
return source.close();
public void dispose() {
source.dispose();
}

@Override
public boolean isDisposed() {
return source.isDisposed();
}

@Override
Expand Down
Loading