From 227a4c86c921271251fc17dc75be28a171905cea Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Thu, 30 Nov 2017 18:03:05 -0800 Subject: [PATCH] clear connections map after streams are cleaned up to signal subscribersa --- .../main/java/io/rsocket/RSocketClient.java | 30 ++++++++------- .../src/test/java/io/rsocket/RSocketTest.java | 37 +++++++++++++++---- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 1e6219398..4713c52d1 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -432,21 +432,25 @@ private boolean contains(int streamId) { } protected void cleanup() { - Collection> subscribers; - Collection publishers; - synchronized (RSocketClient.this) { - subscribers = receivers.values(); - publishers = senders.values(); - - senders.clear(); - receivers.clear(); - } + try { + Collection> subscribers; + Collection publishers; + synchronized (RSocketClient.this) { + subscribers = receivers.values(); + publishers = senders.values(); + } - subscribers.forEach(this::cleanUpSubscriber); - publishers.forEach(this::cleanUpLimitableRequestPublisher); + subscribers.forEach(this::cleanUpSubscriber); + publishers.forEach(this::cleanUpLimitableRequestPublisher); - if (null != keepAliveSendSub) { - keepAliveSendSub.dispose(); + if (null != keepAliveSendSub) { + keepAliveSendSub.dispose(); + } + } finally { + synchronized (this) { + senders.clear(); + receivers.clear(); + } } } diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketTest.java index 49a996df8..1e4b168f8 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketTest.java @@ -16,18 +16,11 @@ package io.rsocket; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; - import io.rsocket.exceptions.ApplicationException; import io.rsocket.test.util.LocalDuplexConnection; import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Rule; @@ -41,6 +34,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + public class RSocketTest { @Rule public final SocketRule rule = new SocketRule(); @@ -85,6 +87,22 @@ public void testChannel() throws Exception { latch.await(); } + + @Test(timeout = 2_000L) + public void testCleanup() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + rule.crs + .requestStream(DefaultPayload.create("hi")) + .doOnError(t -> { + Assert.assertTrue(t instanceof ClosedChannelException); + latch.countDown(); + }) + .subscribe(); + + rule.crs.cleanup(); + + latch.await(); + } public static class SocketRule extends ExternalResource { @@ -124,6 +142,11 @@ protected void init() { public Mono requestResponse(Payload payload) { return Mono.just(payload); } + + @Override + public Flux requestStream(Payload payload) { + return Flux.never(); + } @Override public Flux requestChannel(Publisher payloads) {