diff --git a/src/main/java/io/reactivesocket/DefaultReactiveSocket.java b/src/main/java/io/reactivesocket/DefaultReactiveSocket.java index 73e90d20f..1ea884980 100644 --- a/src/main/java/io/reactivesocket/DefaultReactiveSocket.java +++ b/src/main/java/io/reactivesocket/DefaultReactiveSocket.java @@ -440,6 +440,10 @@ public void addOutput(Frame f, Completable callback) { connection.addOutput(f, callback); } + @Override + public double availability() { + return connection.availability(); + } }; @Override diff --git a/src/main/java/io/reactivesocket/DuplexConnection.java b/src/main/java/io/reactivesocket/DuplexConnection.java index 2a6c3f888..905e8fcd3 100644 --- a/src/main/java/io/reactivesocket/DuplexConnection.java +++ b/src/main/java/io/reactivesocket/DuplexConnection.java @@ -38,4 +38,10 @@ default void addOutput(Frame frame, Completable callback) { s.onComplete(); }, callback); } + + /** + * @return the availability of the underlying connection, a number in [0.0, 1.0] + * (higher is better). + */ + double availability(); } diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index c862b0277..be1a6883d 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -26,10 +26,10 @@ * Interface for a connection that supports sending requests and receiving responses */ public interface ReactiveSocket extends AutoCloseable { - Publisher requestResponse(final Payload payload); - Publisher fireAndForget(final Payload payload); + Publisher requestResponse(final Payload payload); + Publisher requestStream(final Payload payload); Publisher requestSubscription(final Payload payload); diff --git a/src/main/java/io/reactivesocket/ReactiveSocketConnector.java b/src/main/java/io/reactivesocket/ReactiveSocketConnector.java new file mode 100644 index 000000000..aa101bef6 --- /dev/null +++ b/src/main/java/io/reactivesocket/ReactiveSocketConnector.java @@ -0,0 +1,76 @@ +package io.reactivesocket; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.function.Function; + +@FunctionalInterface +public interface ReactiveSocketConnector { + /** + * Asynchronously connect and construct a ReactiveSocket + * @return a Publisher that will return the ReactiveSocket + */ + Publisher connect(T address); + + /** + * Transform the ReactiveSocket returned by the connector via the provided function `func` + * @param func the transformative function + * @return a new ReactiveSocketConnector + */ + default ReactiveSocketConnector chain(Function func) { + return new ReactiveSocketConnector() { + @Override + public Publisher connect(T address) { + return subscriber -> + ReactiveSocketConnector.this.connect(address).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(ReactiveSocket reactiveSocket) { + ReactiveSocket socket = func.apply(reactiveSocket); + subscriber.onNext(socket); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + } + }; + } + + /** + * Create a ReactiveSocketFactory from a ReactiveSocketConnector + * @param address the address to connect the connector to + * @return the factory + */ + default ReactiveSocketFactory toFactory(T address) { + return new ReactiveSocketFactory() { + @Override + public Publisher apply() { + return ReactiveSocketConnector.this.connect(address); + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public T remote() { + return address; + } + }; + } +} diff --git a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java index 11d67f65c..45176b411 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java +++ b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java @@ -15,102 +15,29 @@ */ package io.reactivesocket; -import io.reactivesocket.internal.rx.EmptySubscription; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.NoSuchElementException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -@FunctionalInterface -public interface ReactiveSocketFactory { - - Publisher call(T t); +import java.net.SocketAddress; +/** + * Factory of ReactiveSocket interface + * This abstraction is useful for abstracting the creation of a ReactiveSocket + * (e.g. inside the LoadBalancer which create ReactiveSocket as needed) + */ +public interface ReactiveSocketFactory { /** - * Gets a socket in a blocking manner - * @param t configuration to create the reactive socket - * @return blocks on create the socket + * Construct the ReactiveSocket + * @return */ - default R callAndWait(T t) { - CompletableFuture future = new CompletableFuture<>(); - - call(t) - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onNext(R reactiveSocket) { - future.complete(reactiveSocket); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onComplete() { - future.completeExceptionally(new NoSuchElementException("Sequence contains no elements")); - } - }); - - return future.join(); - } + Publisher apply(); /** - * - * @param t the configuration used to create the reactive socket - * @param timeout timeout - * @param timeUnit timeout units - * @param executorService ScheduledExecutorService to schedule the timeout on - * @return + * @return a positive numbers representing the availability of the factory. + * Higher is better, 0.0 means not available */ - default Publisher call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) { - Publisher reactiveSocketPublisher = subscriber -> { - AtomicBoolean complete = new AtomicBoolean(); - subscriber.onSubscribe(EmptySubscription.INSTANCE); - call(t) - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onNext(R reactiveSocket) { - subscriber.onNext(reactiveSocket); - } - - @Override - public void onError(Throwable t) { - subscriber.onError(t); - } - - @Override - public void onComplete() { - if (complete.compareAndSet(false, true)) { - subscriber.onComplete(); - } - } - }); - - executorService.schedule(() -> { - if (complete.compareAndSet(false, true)) { - subscriber.onError(new TimeoutException()); - } - }, timeout, timeUnit); - }; - - return reactiveSocketPublisher; - } + double availability(); + /** + * @return an identifier of the remote location + */ + T remote(); } diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index 586fff298..284d3b7a4 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -286,14 +286,14 @@ private void assertStarted() { */ public double availability() { if (!honorLease) { - return 1.0; + return connection.availability(); } final long now = System.currentTimeMillis(); double available = 0.0; if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) { available = 1.0; } - return available; + return available * connection.availability(); } /* @@ -873,6 +873,7 @@ public void error(Throwable e) { Publisher keepaliveTicker = PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS); + connection.addOutput(keepaliveTicker, new Completable() { public void success() {} diff --git a/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java b/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java new file mode 100644 index 000000000..dfa2fdc59 --- /dev/null +++ b/src/main/java/io/reactivesocket/util/ReactiveSocketProxy.java @@ -0,0 +1,147 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.util; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import java.util.function.Consumer; +import java.util.function.Function; + + +/** + * Wrapper/Proxy for a ReactiveSocket. + * This is useful when we want to override a specific method. + */ +public class ReactiveSocketProxy implements ReactiveSocket { + protected final ReactiveSocket child; + private final Function, Subscriber> subscriberWrapper; + + public ReactiveSocketProxy(ReactiveSocket child, Function, Subscriber> subscriberWrapper) { + this.child = child; + this.subscriberWrapper = subscriberWrapper; + } + + public ReactiveSocketProxy(ReactiveSocket child) { + this(child, null); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return child.fireAndForget(payload); + } + + @Override + public Publisher requestResponse(Payload payload) { + if (subscriberWrapper == null) { + return child.requestResponse(payload); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestResponse(payload).subscribe(subscriber); + }; + } + } + + @Override + public Publisher requestStream(Payload payload) { + if (subscriberWrapper == null) { + return child.requestStream(payload); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestStream(payload).subscribe(subscriber); + }; + } + } + + @Override + public Publisher requestSubscription(Payload payload) { + if (subscriberWrapper == null) { + return child.requestSubscription(payload); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestSubscription(payload).subscribe(subscriber); + }; + } + } + + @Override + public Publisher requestChannel(Publisher payloads) { + if (subscriberWrapper == null) { + return child.requestChannel(payloads); + } else { + return s -> { + Subscriber subscriber = subscriberWrapper.apply(s); + child.requestChannel(payloads).subscribe(subscriber); + }; + } + } + + @Override + public Publisher metadataPush(Payload payload) { + return child.metadataPush(payload); + } + + @Override + public double availability() { + return child.availability(); + } + + @Override + public void start(Completable c) { + child.start(c); + } + + @Override + public void onRequestReady(Consumer c) { + child.onRequestReady(c); + } + + @Override + public void onRequestReady(Completable c) { + child.onRequestReady(c); + } + + @Override + public void onShutdown(Completable c) { + child.onShutdown(c); + } + + @Override + public void sendLease(int ttl, int numberOfRequests) { + child.sendLease(ttl, numberOfRequests); + } + + @Override + public void shutdown() { + child.shutdown(); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public String toString() { + return "ReactiveSocketProxy(" + child.toString() + ")"; + } +} \ No newline at end of file diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java index 1d05f2eec..7f5747417 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java @@ -64,6 +64,11 @@ public void addOutput(Frame f, Completable callback) { callback.success(); } + @Override + public double availability() { + return 1.0; + } + @Override public Observable getInput() { return toInput; @@ -72,11 +77,9 @@ public Observable getInput() { public void connectToServerConnection(PerfTestConnection serverConnection) { writeSubject.subscribe(serverConnection.toInput); serverConnection.writeSubject.subscribe(toInput); - } @Override public void close() throws IOException { - } } \ No newline at end of file diff --git a/src/test/java/io/reactivesocket/TestConnection.java b/src/test/java/io/reactivesocket/TestConnection.java index 9f1c4e7b5..125b45280 100644 --- a/src/test/java/io/reactivesocket/TestConnection.java +++ b/src/test/java/io/reactivesocket/TestConnection.java @@ -48,6 +48,11 @@ public void addOutput(Frame f, Completable callback) { callback.success(); } + @Override + public double availability() { + return 1.0; + } + @Override public io.reactivesocket.rx.Observable getInput() { return new io.reactivesocket.rx.Observable() {