From 379db4457b4125fdbc97f025bc7b342b6046b89d Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Thu, 14 Jul 2016 15:26:37 -0700 Subject: [PATCH 1/2] Loadbalancer: closing doesn't subscribe to the underlying ***Problem*** Closing the loadbalancer doesn't properly subscribe to the `Publisher`s returned by the `close()` methods of the underlying `ReactiveSocket`. Thus, the close event is lost at the LoadBalancer level. ***Solution*** Properly subscribe to the close `Publisher`s and propagate the `onComplete` events when all `ReactiveSocket` are closed. (I choose to ignore any exception that happened during the close, i.e. only propagate 1 `onComplete`). --- .../reactivesocket/client/LoadBalancer.java | 53 +++++-- .../internal/rx/EmptySubscriber.java | 22 +++ .../integration/IntegrationTest.java | 134 ++++++++++++++++++ 3 files changed, 195 insertions(+), 14 deletions(-) create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/internal/rx/EmptySubscriber.java create mode 100644 reactivesocket-examples/src/test/java/io/reactivesocket/integration/IntegrationTest.java diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java index 8c91a8ce7..ea060c1e1 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -27,6 +27,8 @@ import io.reactivesocket.internal.EmptySubject; import io.reactivesocket.internal.Publishers; import io.reactivesocket.internal.Publishers; +import io.reactivesocket.internal.rx.EmptySubscriber; +import io.reactivesocket.internal.rx.EmptySubscription; import io.reactivesocket.rx.Completable; import io.reactivesocket.client.stat.FrugalQuantile; import io.reactivesocket.client.stat.Quantile; @@ -42,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -468,22 +471,44 @@ public synchronized String toString() { @Override public Publisher close() { - return s -> { - Publishers.afterTerminate(onClose(), () -> { - synchronized (this) { - factoryRefresher.close(); - activeFactories.clear(); - activeSockets.forEach(rs -> { - try { - rs.close(); - } catch (Exception e) { - logger.warn("Exception while closing a ReactiveSocket", e); + return subscriber -> { + subscriber.onSubscribe(EmptySubscription.INSTANCE); + + synchronized (this) { + factoryRefresher.close(); + activeFactories.clear(); + AtomicInteger n = new AtomicInteger(activeSockets.size()); + + activeSockets.forEach(rs -> { + rs.close().subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) {} + + @Override + public void onError(Throwable t) { + if (n.decrementAndGet() == 0) { + subscriber.onComplete(); + closeSubject.subscribe(EmptySubscriber.INSTANCE); + closeSubject.onComplete(); + } + } + + @Override + public void onComplete() { + if (n.decrementAndGet() == 0) { + subscriber.onComplete(); + closeSubject.subscribe(EmptySubscriber.INSTANCE); + closeSubject.onComplete(); + } } }); - } - }); - closeSubject.subscribe(s); - closeSubject.onComplete(); + }); + } }; } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/rx/EmptySubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/rx/EmptySubscriber.java new file mode 100644 index 000000000..dbcd61ce5 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/rx/EmptySubscriber.java @@ -0,0 +1,22 @@ +package io.reactivesocket.internal.rx; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public enum EmptySubscriber implements Subscriber { + INSTANCE(); + + @Override + public void onSubscribe(Subscription s) { + + } + + @Override + public void onNext(Object t) {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onComplete() {} +} diff --git a/reactivesocket-examples/src/test/java/io/reactivesocket/integration/IntegrationTest.java b/reactivesocket-examples/src/test/java/io/reactivesocket/integration/IntegrationTest.java new file mode 100644 index 000000000..671ef698e --- /dev/null +++ b/reactivesocket-examples/src/test/java/io/reactivesocket/integration/IntegrationTest.java @@ -0,0 +1,134 @@ +package io.reactivesocket.integration; + +import io.reactivesocket.*; +import io.reactivesocket.client.ClientBuilder; +import io.reactivesocket.internal.Publishers; +import io.reactivesocket.test.TestUtil; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; +import io.reactivesocket.util.Unsafe; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; +import static rx.RxReactiveStreams.toObservable; + +public class IntegrationTest { + + private interface TestingServer { + int requestCount(); + int disconnectionCount(); + SocketAddress getListeningAddress(); + } + + private TestingServer createServer() { + AtomicInteger requestCounter = new AtomicInteger(); + AtomicInteger disconnectionCounter = new AtomicInteger(); + + ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> { + reactiveSocket.onClose().subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onComplete() { + disconnectionCounter.incrementAndGet(); + } + }); + return new RequestHandler.Builder() + .withRequestResponse( + payload -> subscriber -> subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + requestCounter.incrementAndGet(); + subscriber.onNext(TestUtil.utf8EncodedPayload("RESPONSE", "NO_META")); + subscriber.onComplete(); + } + + @Override + public void cancel() {} + }) + ) + .build(); + }; + + SocketAddress addr = new InetSocketAddress("127.0.0.1", 0); + TcpReactiveSocketServer.StartedServer server = + TcpReactiveSocketServer.create(addr).start(setupHandler); + + return new TestingServer() { + @Override + public int requestCount() { + return requestCounter.get(); + } + + @Override + public int disconnectionCount() { + return disconnectionCounter.get(); + } + + @Override + public SocketAddress getListeningAddress() { + return server.getServerAddress(); + } + }; + } + + private ReactiveSocket createClient(SocketAddress addr) throws InterruptedException, ExecutionException, TimeoutException { + List addrs = Collections.singletonList(addr); + Publisher> src = Publishers.just(addrs); + + ConnectionSetupPayload setupPayload = + ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.HONOR_LEASE); + TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace); + + Publisher socketPublisher = + ClientBuilder.instance() + .withSource(src) + .withConnector(tcp) + .build(); + + return Unsafe.blockingSingleWait(socketPublisher, 5, TimeUnit.SECONDS); + } + + @Test(timeout = 2_000L) + public void testRequest() throws ExecutionException, InterruptedException, TimeoutException { + TestingServer server = createServer(); + ReactiveSocket client = createClient(server.getListeningAddress()); + + toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("RESPONSE", "NO_META"))) + .toBlocking() + .subscribe(); + assertTrue("Server see the request", server.requestCount() > 0); + } + + @Test(timeout = 2_000L) + public void testClose() throws ExecutionException, InterruptedException, TimeoutException { + TestingServer server = createServer(); + ReactiveSocket client = createClient(server.getListeningAddress()); + + toObservable(client.close()).toBlocking().subscribe(); + + Thread.sleep(100); + assertTrue("Server see disconnection", server.disconnectionCount() > 0); + } +} From 9cefcb4b344a76934b813c46240a55d885cf96ba Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Thu, 14 Jul 2016 23:17:18 -0700 Subject: [PATCH 2/2] Address comments --- .../main/java/io/reactivesocket/client/LoadBalancer.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java index ea060c1e1..a4407209f 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -491,11 +491,8 @@ public void onNext(Void aVoid) {} @Override public void onError(Throwable t) { - if (n.decrementAndGet() == 0) { - subscriber.onComplete(); - closeSubject.subscribe(EmptySubscriber.INSTANCE); - closeSubject.onComplete(); - } + logger.warn("Exception while closing a ReactiveSocket", t); + onComplete(); } @Override