diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 90d0778c5..a1017e62c 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -16,14 +16,18 @@ package io.rsocket; -import static io.rsocket.util.ExceptionUtil.noStacktrace; - import io.netty.buffer.Unpooled; import io.netty.util.collection.IntObjectHashMap; import io.rsocket.exceptions.ConnectionException; import io.rsocket.exceptions.Exceptions; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.util.PayloadImpl; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.Disposable; +import reactor.core.publisher.*; + +import javax.annotation.Nullable; import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Collection; @@ -32,11 +36,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import javax.annotation.Nullable; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.core.Disposable; -import reactor.core.publisher.*; + +import static io.rsocket.util.ExceptionUtil.noStacktrace; /** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */ class RSocketClient implements RSocket { @@ -419,17 +420,19 @@ private boolean contains(int streamId) { } protected void cleanup() { - senders.forEach( - (integer, limitableRequestPublisher) -> - cleanUpLimitableRequestPublisher(limitableRequestPublisher)); - - receivers.forEach((integer, subscriber) -> cleanUpSubscriber(subscriber)); - - synchronized (this) { + Collection> subscribers; + Collection publishers; + synchronized (RSocketClient.this) { + subscribers = receivers.values(); + publishers = senders.values(); + senders.clear(); receivers.clear(); } + subscribers.forEach(this::cleanUpSubscriber); + publishers.forEach(this::cleanUpLimitableRequestPublisher); + if (null != keepAliveSendSub) { keepAliveSendSub.dispose(); } @@ -437,11 +440,19 @@ protected void cleanup() { private synchronized void cleanUpLimitableRequestPublisher( LimitableRequestPublisher limitableRequestPublisher) { - limitableRequestPublisher.cancel(); + try { + limitableRequestPublisher.cancel(); + } catch (Throwable t) { + errorConsumer.accept(t); + } } private synchronized void cleanUpSubscriber(Subscriber subscriber) { - subscriber.onError(CLOSED_CHANNEL_EXCEPTION); + try { + subscriber.onError(CLOSED_CHANNEL_EXCEPTION); + } catch (Throwable t) { + errorConsumer.accept(t); + } } private void handleIncomingFrames(Frame frame) {