From aa05464e939db58b6a3a5cf4472c5d39309ad140 Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 31 May 2016 08:34:53 -0700 Subject: [PATCH 1/3] Introduce Connector and availability for Connection Problem The DuplexConnection class doesn't expose any way to probe its state, this is somewhat problematic and doesn't fit well in the current model where the availability is the composable way of chaining state. The ReactiveSocketFactory create a ReactiveSocket from an `address` (currently SocketAddress). There are places in the code where the concept of address is not needed and it leads to implicit dependency on it. Solution Add a `double availability()` method to the DuplexConnection interface, most implementation of this method will be straightforward (just returning 0.0 if the underlying resource is unavailable, 1.0 otherwise). Split the concept of ReactiveSocketFactory in two, the Factory and the Connector. The Connector is responsible for creating the ReactiveSocket from the address, and the factory can create a ReactiveSocket without any argument. --- .../reactivesocket/DefaultReactiveSocket.java | 4 + .../io/reactivesocket/DuplexConnection.java | 6 + .../io/reactivesocket/ReactiveSocket.java | 4 +- .../reactivesocket/ReactiveSocketFactory.java | 107 +++--------------- .../io/reactivesocket/internal/Requester.java | 5 +- .../perfutil/PerfTestConnection.java | 7 +- .../io/reactivesocket/TestConnection.java | 5 + 7 files changed, 42 insertions(+), 96 deletions(-) diff --git a/src/main/java/io/reactivesocket/DefaultReactiveSocket.java b/src/main/java/io/reactivesocket/DefaultReactiveSocket.java index 73e90d20f..1ea884980 100644 --- a/src/main/java/io/reactivesocket/DefaultReactiveSocket.java +++ b/src/main/java/io/reactivesocket/DefaultReactiveSocket.java @@ -440,6 +440,10 @@ public void addOutput(Frame f, Completable callback) { connection.addOutput(f, callback); } + @Override + public double availability() { + return connection.availability(); + } }; @Override diff --git a/src/main/java/io/reactivesocket/DuplexConnection.java b/src/main/java/io/reactivesocket/DuplexConnection.java index 2a6c3f888..905e8fcd3 100644 --- a/src/main/java/io/reactivesocket/DuplexConnection.java +++ b/src/main/java/io/reactivesocket/DuplexConnection.java @@ -38,4 +38,10 @@ default void addOutput(Frame frame, Completable callback) { s.onComplete(); }, callback); } + + /** + * @return the availability of the underlying connection, a number in [0.0, 1.0] + * (higher is better). + */ + double availability(); } diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index c862b0277..be1a6883d 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -26,10 +26,10 @@ * Interface for a connection that supports sending requests and receiving responses */ public interface ReactiveSocket extends AutoCloseable { - Publisher requestResponse(final Payload payload); - Publisher fireAndForget(final Payload payload); + Publisher requestResponse(final Payload payload); + Publisher requestStream(final Payload payload); Publisher requestSubscription(final Payload payload); diff --git a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java index 11d67f65c..45176b411 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java +++ b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java @@ -15,102 +15,29 @@ */ package io.reactivesocket; -import io.reactivesocket.internal.rx.EmptySubscription; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.NoSuchElementException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -@FunctionalInterface -public interface ReactiveSocketFactory { - - Publisher call(T t); +import java.net.SocketAddress; +/** + * Factory of ReactiveSocket interface + * This abstraction is useful for abstracting the creation of a ReactiveSocket + * (e.g. inside the LoadBalancer which create ReactiveSocket as needed) + */ +public interface ReactiveSocketFactory { /** - * Gets a socket in a blocking manner - * @param t configuration to create the reactive socket - * @return blocks on create the socket + * Construct the ReactiveSocket + * @return */ - default R callAndWait(T t) { - CompletableFuture future = new CompletableFuture<>(); - - call(t) - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onNext(R reactiveSocket) { - future.complete(reactiveSocket); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onComplete() { - future.completeExceptionally(new NoSuchElementException("Sequence contains no elements")); - } - }); - - return future.join(); - } + Publisher apply(); /** - * - * @param t the configuration used to create the reactive socket - * @param timeout timeout - * @param timeUnit timeout units - * @param executorService ScheduledExecutorService to schedule the timeout on - * @return + * @return a positive numbers representing the availability of the factory. + * Higher is better, 0.0 means not available */ - default Publisher call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) { - Publisher reactiveSocketPublisher = subscriber -> { - AtomicBoolean complete = new AtomicBoolean(); - subscriber.onSubscribe(EmptySubscription.INSTANCE); - call(t) - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onNext(R reactiveSocket) { - subscriber.onNext(reactiveSocket); - } - - @Override - public void onError(Throwable t) { - subscriber.onError(t); - } - - @Override - public void onComplete() { - if (complete.compareAndSet(false, true)) { - subscriber.onComplete(); - } - } - }); - - executorService.schedule(() -> { - if (complete.compareAndSet(false, true)) { - subscriber.onError(new TimeoutException()); - } - }, timeout, timeUnit); - }; - - return reactiveSocketPublisher; - } + double availability(); + /** + * @return an identifier of the remote location + */ + T remote(); } diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index 586fff298..284d3b7a4 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -286,14 +286,14 @@ private void assertStarted() { */ public double availability() { if (!honorLease) { - return 1.0; + return connection.availability(); } final long now = System.currentTimeMillis(); double available = 0.0; if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) { available = 1.0; } - return available; + return available * connection.availability(); } /* @@ -873,6 +873,7 @@ public void error(Throwable e) { Publisher keepaliveTicker = PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS); + connection.addOutput(keepaliveTicker, new Completable() { public void success() {} diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java index 1d05f2eec..7f5747417 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java @@ -64,6 +64,11 @@ public void addOutput(Frame f, Completable callback) { callback.success(); } + @Override + public double availability() { + return 1.0; + } + @Override public Observable getInput() { return toInput; @@ -72,11 +77,9 @@ public Observable getInput() { public void connectToServerConnection(PerfTestConnection serverConnection) { writeSubject.subscribe(serverConnection.toInput); serverConnection.writeSubject.subscribe(toInput); - } @Override public void close() throws IOException { - } } \ No newline at end of file diff --git a/src/test/java/io/reactivesocket/TestConnection.java b/src/test/java/io/reactivesocket/TestConnection.java index 9f1c4e7b5..125b45280 100644 --- a/src/test/java/io/reactivesocket/TestConnection.java +++ b/src/test/java/io/reactivesocket/TestConnection.java @@ -48,6 +48,11 @@ public void addOutput(Frame f, Completable callback) { callback.success(); } + @Override + public double availability() { + return 1.0; + } + @Override public io.reactivesocket.rx.Observable getInput() { return new io.reactivesocket.rx.Observable() { From 28cc17e0c5735c3a37e901ce6cabe7525a68b172 Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 31 May 2016 09:46:55 -0700 Subject: [PATCH 2/3] Add the missing Connector class + the helper ReactiveSocketProxy --- .../ReactiveSocketConnector.java | 76 +++++++++ .../util/ReactiveSocketProxy.java | 150 ++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 src/main/java/io/reactivesocket/ReactiveSocketConnector.java create mode 100644 src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java diff --git a/src/main/java/io/reactivesocket/ReactiveSocketConnector.java b/src/main/java/io/reactivesocket/ReactiveSocketConnector.java new file mode 100644 index 000000000..aa101bef6 --- /dev/null +++ b/src/main/java/io/reactivesocket/ReactiveSocketConnector.java @@ -0,0 +1,76 @@ +package io.reactivesocket; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.function.Function; + +@FunctionalInterface +public interface ReactiveSocketConnector { + /** + * Asynchronously connect and construct a ReactiveSocket + * @return a Publisher that will return the ReactiveSocket + */ + Publisher connect(T address); + + /** + * Transform the ReactiveSocket returned by the connector via the provided function `func` + * @param func the transformative function + * @return a new ReactiveSocketConnector + */ + default ReactiveSocketConnector chain(Function func) { + return new ReactiveSocketConnector() { + @Override + public Publisher connect(T address) { + return subscriber -> + ReactiveSocketConnector.this.connect(address).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(ReactiveSocket reactiveSocket) { + ReactiveSocket socket = func.apply(reactiveSocket); + subscriber.onNext(socket); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + } + }; + } + + /** + * Create a ReactiveSocketFactory from a ReactiveSocketConnector + * @param address the address to connect the connector to + * @return the factory + */ + default ReactiveSocketFactory toFactory(T address) { + return new ReactiveSocketFactory() { + @Override + public Publisher apply() { + return ReactiveSocketConnector.this.connect(address); + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public T remote() { + return address; + } + }; + } +} diff --git a/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java b/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java new file mode 100644 index 000000000..168fa7081 --- /dev/null +++ b/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java @@ -0,0 +1,150 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.util; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import java.util.function.Consumer; +import java.util.function.Function; + + +/** + * Wrapper/Proxy for a ReactiveSocket. + * This is useful when we want to override a specific method. + */ +public class ReactiveSocketProxy implements ReactiveSocket { + protected final ReactiveSocket child; + private final Function, Subscriber> subscriberWrapper; + + public ReactiveSocketProxy(ReactiveSocket child, Function, Subscriber> subscriberWrapper) { + this.child = child; + this.subscriberWrapper = subscriberWrapper; + } + + public ReactiveSocketProxy(ReactiveSocket child) { + this(child, null); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return child.fireAndForget(payload); + } + + @Override + public Publisher requestResponse(Payload payload) { + if (subscriberWrapper == null) { + return child.requestResponse(payload); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestResponse(payload).subscribe(subscriber); + }; + } + } + + @Override + public Publisher requestStream(Payload payload) { + if (subscriberWrapper == null) { + return child.requestStream(payload); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestStream(payload).subscribe(subscriber); + }; + } + + } + + @Override + public Publisher requestSubscription(Payload payload) { + if (subscriberWrapper == null) { + return child.requestSubscription(payload); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestSubscription(payload).subscribe(subscriber); + }; + } + + } + + @Override + public Publisher requestChannel(Publisher payloads) { + if (subscriberWrapper == null) { + return child.requestChannel(payloads); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestChannel(payloads).subscribe(subscriber); + }; + } + + } + + @Override + public Publisher metadataPush(Payload payload) { + return child.metadataPush(payload); + } + + @Override + public double availability() { + return child.availability(); + } + + @Override + public void start(Completable c) { + child.start(c); + } + + @Override + public void onRequestReady(Consumer c) { + child.onRequestReady(c); + } + + @Override + public void onRequestReady(Completable c) { + child.onRequestReady(c); + } + + @Override + public void onShutdown(Completable c) { + child.onShutdown(c); + } + + @Override + public void sendLease(int ttl, int numberOfRequests) { + child.sendLease(ttl, numberOfRequests); + } + + @Override + public void shutdown() { + child.shutdown(); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public String toString() { + return "ReactiveSocketProxy(" + child.toString() + ")"; + } +} \ No newline at end of file From a7cb3d274637be065ee0cfcc9cf7418dcf1699ee Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Fri, 3 Jun 2016 17:36:50 -0700 Subject: [PATCH 3/3] WIP --- src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java b/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java index 168fa7081..dfa2fdc59 100644 --- a/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java +++ b/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java @@ -69,7 +69,6 @@ public Publisher requestStream(Payload payload) { child.requestStream(payload).subscribe(subscriber); }; } - } @Override @@ -82,7 +81,6 @@ public Publisher requestSubscription(Payload payload) { child.requestSubscription(payload).subscribe(subscriber); }; } - } @Override @@ -95,7 +93,6 @@ public Publisher requestChannel(Publisher payloads) { child.requestChannel(payloads).subscribe(subscriber); }; } - } @Override