Skip to content

Commit

Permalink
ensures RSocketRequester awaits proper termination of connection and …
Browse files Browse the repository at this point in the history
…responder side

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka authored and OlegDokuka committed Apr 6, 2023
1 parent 8959385 commit 50ec5be
Show file tree
Hide file tree
Showing 28 changed files with 297 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
this.source = source;
this.isClient = isClient;

this.serverReceiver = new InternalDuplexConnection(this, source);
this.clientReceiver = new InternalDuplexConnection(this, source);
this.serverReceiver = new InternalDuplexConnection(Type.SERVER, this, source);
this.clientReceiver = new InternalDuplexConnection(Type.CLIENT, this, source);
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
}
Expand Down Expand Up @@ -195,8 +195,33 @@ int incrementAndGetCheckingState() {
}
}

@Override
public String toString() {
return "ClientServerInputMultiplexer{"
+ "serverReceiver="
+ serverReceiver
+ ", clientReceiver="
+ clientReceiver
+ ", serverConnection="
+ serverConnection
+ ", clientConnection="
+ clientConnection
+ ", source="
+ source
+ ", isClient="
+ isClient
+ ", s="
+ s
+ ", t="
+ t
+ ", state="
+ state
+ '}';
}

private static class InternalDuplexConnection extends Flux<ByteBuf>
implements Subscription, DuplexConnection {
private final Type type;
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
private final DuplexConnection source;

Expand All @@ -207,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
CoreSubscriber<? super ByteBuf> actual;

public InternalDuplexConnection(
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
Type type,
ClientServerInputMultiplexer clientServerInputMultiplexer,
DuplexConnection source) {
this.type = type;
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
this.source = source;
}
Expand Down Expand Up @@ -304,5 +332,17 @@ public Mono<Void> onClose() {
public double availability() {
return source.availability();
}

@Override
public String toString() {
return "InternalDuplexConnection{"
+ "type="
+ type
+ ", source="
+ source
+ ", state="
+ state
+ '}';
}
}
}
19 changes: 17 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
Expand Down Expand Up @@ -655,6 +656,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnAllClosedSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink =
Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
multiplexer.asClientConnection(),
Expand All @@ -667,7 +673,15 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker);
requesterLeaseTracker,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnAllClosedSink
.asMono()
.log("client-responder"),
requesterOnAllClosedSink
.asMono()
.log("client-requester")));

RSocket wrappedRSocketRequester =
interceptors.initRequester(rSocketRequester);
Expand Down Expand Up @@ -715,7 +729,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(RequestInterceptor)
leases.sender)
: interceptors
::initResponderRequestInterceptor);
::initResponderRequestInterceptor,
responderOnAllClosedSink);

return wrappedRSocketRequester;
})
Expand Down
63 changes: 52 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
RSocketRequester.class, Throwable.class, "terminationError");

@Nullable private final RequesterLeaseTracker requesterLeaseTracker;

private final Sinks.Empty<Void> onThisSideClosedSink;
private final Mono<Void> onAllClosed;
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private final Sinks.Empty<Void> onClose;

RSocketRequester(
DuplexConnection connection,
Expand All @@ -80,7 +82,9 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
int keepAliveAckTimeout,
@Nullable KeepAliveHandler keepAliveHandler,
Function<RSocket, RequestInterceptor> requestInterceptorFunction,
@Nullable RequesterLeaseTracker requesterLeaseTracker) {
@Nullable RequesterLeaseTracker requesterLeaseTracker,
Sinks.Empty<Void> onThisSideClosedSink,
Mono<Void> onAllClosed) {
super(
mtu,
maxFrameLength,
Expand All @@ -91,10 +95,11 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
requestInterceptorFunction);

this.requesterLeaseTracker = requesterLeaseTracker;
this.onClose = Sinks.empty();
this.onThisSideClosedSink = onThisSideClosedSink;
this.onAllClosed = onAllClosed;

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown);

connection.receive().subscribe(this::handleIncomingFrames, e -> {});

Expand Down Expand Up @@ -188,7 +193,11 @@ public double availability() {

@Override
public void dispose() {
tryShutdown();
if (terminationError != null) {
return;
}

getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
}

@Override
Expand All @@ -198,7 +207,7 @@ public boolean isDisposed() {

@Override
public Mono<Void> onClose() {
return onClose.asMono();
return onAllClosed;
}

private void handleIncomingFrames(ByteBuf frame) {
Expand Down Expand Up @@ -305,8 +314,25 @@ private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
}

private void tryTerminateOnConnectionError(Throwable e) {
tryTerminate(() -> e);
private void tryShutdown(Throwable e) {
LOGGER.info("trying to close requester " + getDuplexConnection());
if (terminationError == null) {
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
terminate(CLOSED_CHANNEL_EXCEPTION);
} else {
LOGGER.info(
"trying to close requester failed because of "
+ terminationError
+ " "
+ getDuplexConnection());
}
} else {
LOGGER.info(
"trying to close requester failed because of "
+ terminationError
+ " "
+ getDuplexConnection());
}
}

private void tryTerminateOnZeroError(ByteBuf errorFrame) {
Expand All @@ -323,18 +349,31 @@ private void tryTerminate(Supplier<Throwable> errorSupplier) {
}

private void tryShutdown() {
LOGGER.info("trying to close requester " + getDuplexConnection());
if (terminationError == null) {
if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) {
terminate(CLOSED_CHANNEL_EXCEPTION);
} else {
LOGGER.info(
"trying to close requester failed because of "
+ terminationError
+ " "
+ getDuplexConnection());
}
} else {
LOGGER.info(
"trying to close requester failed because of "
+ terminationError
+ " "
+ getDuplexConnection());
}
}

private void terminate(Throwable e) {
LOGGER.info("closing requester " + getDuplexConnection());
if (keepAliveFramesAcceptor != null) {
keepAliveFramesAcceptor.dispose();
}
getDuplexConnection().dispose();
final RequestInterceptor requestInterceptor = getRequestInterceptor();
if (requestInterceptor != null) {
requestInterceptor.dispose();
Expand All @@ -361,9 +400,11 @@ private void terminate(Throwable e) {
}

if (e == CLOSED_CHANNEL_EXCEPTION) {
onClose.tryEmitEmpty();
onThisSideClosedSink.tryEmitEmpty();
} else {
onClose.tryEmitError(e);
onThisSideClosedSink.tryEmitError(e);
}

LOGGER.info("requester closed " + getDuplexConnection());
}
}
15 changes: 12 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
Expand All @@ -54,6 +55,7 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();

private final RSocket requestHandler;
private final Sinks.Empty<Void> onThisSideClosedSink;

@Nullable private final ResponderLeaseTracker leaseHandler;

Expand All @@ -70,7 +72,8 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
int mtu,
int maxFrameLength,
int maxInboundPayloadSize,
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
Sinks.Empty<Void> onThisSideClosedSink) {
super(
mtu,
maxFrameLength,
Expand All @@ -83,19 +86,22 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
this.requestHandler = requestHandler;

this.leaseHandler = leaseHandler;

connection.receive().subscribe(this::handleFrame, e -> {});
this.onThisSideClosedSink = onThisSideClosedSink;

connection
.onClose()
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);

connection.receive().subscribe(this::handleFrame, e -> {});
}

private void tryTerminateOnConnectionError(Throwable e) {
LOGGER.info("Try terminate connection on responder side");
tryTerminate(() -> e);
}

private void tryTerminateOnConnectionClose() {
LOGGER.info("Try terminate connection on responder side");
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
}

Expand Down Expand Up @@ -169,6 +175,7 @@ public Mono<Void> onClose() {
}

final void doOnDispose() {
LOGGER.info("closing responder " + getDuplexConnection());
cleanUpSendingSubscriptions();

getDuplexConnection().dispose();
Expand All @@ -183,6 +190,8 @@ final void doOnDispose() {
}

requestHandler.dispose();
onThisSideClosedSink.tryEmitEmpty();
LOGGER.info("responder closed " + getDuplexConnection());
}

private void cleanUpSendingSubscriptions() {
Expand Down
12 changes: 10 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/**
* The main class for starting an RSocket server.
Expand Down Expand Up @@ -437,6 +438,9 @@ private Mono<Void> acceptSetup(
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnAllClosedSink = Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink = Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
multiplexer.asServerConnection(),
Expand All @@ -449,7 +453,10 @@ private Mono<Void> acceptSetup(
setupPayload.keepAliveMaxLifetime(),
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker);
requesterLeaseTracker,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono()));

RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);

Expand Down Expand Up @@ -481,7 +488,8 @@ private Mono<Void> acceptSetup(
? rSocket ->
interceptors.initResponderRequestInterceptor(
rSocket, (RequestInterceptor) leases.sender)
: interceptors::initResponderRequestInterceptor);
: interceptors::initResponderRequestInterceptor,
responderOnAllClosedSink);
})
.doFinally(signalType -> setupPayload.release())
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,9 @@ public void sendErrorAndClose(RSocketErrorException e) {
public ByteBufAllocator alloc() {
return source.alloc();
}

@Override
public String toString() {
return "SetupHandlingDuplexConnection{" + "source=" + source + ", done=" + done + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

public abstract class BaseDuplexConnection implements DuplexConnection {
protected final Sinks.Empty<Void> onClose = Sinks.empty();
protected final UnboundedProcessor sender = new UnboundedProcessor(onClose::tryEmitEmpty);
protected final UnboundedProcessor sender =
new UnboundedProcessor(() -> onClose.tryEmitEmpty(), (__) -> {});

public BaseDuplexConnection() {}

Expand Down
Loading

0 comments on commit 50ec5be

Please sign in to comment.