Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/io/reactivesocket/DefaultReactiveSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ public void addOutput(Frame f, Completable callback) {
connection.addOutput(f, callback);
}

@Override
public double availability() {
return connection.availability();
}
};

@Override
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/reactivesocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ default void addOutput(Frame frame, Completable callback) {
s.onComplete();
}, callback);
}

/**
* @return the availability of the underlying connection, a number in [0.0, 1.0]
* (higher is better).
*/
double availability();
}
4 changes: 2 additions & 2 deletions src/main/java/io/reactivesocket/ReactiveSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
* Interface for a connection that supports sending requests and receiving responses
*/
public interface ReactiveSocket extends AutoCloseable {
Publisher<Payload> requestResponse(final Payload payload);

Publisher<Void> fireAndForget(final Payload payload);

Publisher<Payload> requestResponse(final Payload payload);

Publisher<Payload> requestStream(final Payload payload);

Publisher<Payload> requestSubscription(final Payload payload);
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/io/reactivesocket/ReactiveSocketConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.reactivesocket;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.function.Function;

@FunctionalInterface
public interface ReactiveSocketConnector<T> {
/**
* Asynchronously connect and construct a ReactiveSocket
* @return a Publisher that will return the ReactiveSocket
*/
Publisher<ReactiveSocket> connect(T address);

/**
* Transform the ReactiveSocket returned by the connector via the provided function `func`
* @param func the transformative function
* @return a new ReactiveSocketConnector
*/
default ReactiveSocketConnector<T> chain(Function<ReactiveSocket, ReactiveSocket> func) {
return new ReactiveSocketConnector<T>() {
@Override
public Publisher<ReactiveSocket> connect(T address) {
return subscriber ->
ReactiveSocketConnector.this.connect(address).subscribe(new Subscriber<ReactiveSocket>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}

@Override
public void onNext(ReactiveSocket reactiveSocket) {
ReactiveSocket socket = func.apply(reactiveSocket);
subscriber.onNext(socket);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
};
}

/**
* Create a ReactiveSocketFactory from a ReactiveSocketConnector
* @param address the address to connect the connector to
* @return the factory
*/
default ReactiveSocketFactory<T> toFactory(T address) {
return new ReactiveSocketFactory<T>() {
@Override
public Publisher<ReactiveSocket> apply() {
return ReactiveSocketConnector.this.connect(address);
}

@Override
public double availability() {
return 1.0;
}

@Override
public T remote() {
return address;
}
};
}
}
107 changes: 17 additions & 90 deletions src/main/java/io/reactivesocket/ReactiveSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,102 +15,29 @@
*/
package io.reactivesocket;

import io.reactivesocket.internal.rx.EmptySubscription;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

@FunctionalInterface
public interface ReactiveSocketFactory<T, R extends ReactiveSocket> {

Publisher<R> call(T t);
import java.net.SocketAddress;

/**
* Factory of ReactiveSocket interface
* This abstraction is useful for abstracting the creation of a ReactiveSocket
* (e.g. inside the LoadBalancer which create ReactiveSocket as needed)
*/
public interface ReactiveSocketFactory<T> {
/**
* Gets a socket in a blocking manner
* @param t configuration to create the reactive socket
* @return blocks on create the socket
* Construct the ReactiveSocket
* @return
*/
default R callAndWait(T t) {
CompletableFuture<R> future = new CompletableFuture<>();

call(t)
.subscribe(new Subscriber<R>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}

@Override
public void onNext(R reactiveSocket) {
future.complete(reactiveSocket);
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onComplete() {
future.completeExceptionally(new NoSuchElementException("Sequence contains no elements"));
}
});

return future.join();
}
Publisher<ReactiveSocket> apply();

/**
*
* @param t the configuration used to create the reactive socket
* @param timeout timeout
* @param timeUnit timeout units
* @param executorService ScheduledExecutorService to schedule the timeout on
* @return
* @return a positive numbers representing the availability of the factory.
* Higher is better, 0.0 means not available
*/
default Publisher<R> call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) {
Publisher<R> reactiveSocketPublisher = subscriber -> {
AtomicBoolean complete = new AtomicBoolean();
subscriber.onSubscribe(EmptySubscription.INSTANCE);
call(t)
.subscribe(new Subscriber<R>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}

@Override
public void onNext(R reactiveSocket) {
subscriber.onNext(reactiveSocket);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
if (complete.compareAndSet(false, true)) {
subscriber.onComplete();
}
}
});

executorService.schedule(() -> {
if (complete.compareAndSet(false, true)) {
subscriber.onError(new TimeoutException());
}
}, timeout, timeUnit);
};

return reactiveSocketPublisher;
}
double availability();

/**
* @return an identifier of the remote location
*/
T remote();
}
5 changes: 3 additions & 2 deletions src/main/java/io/reactivesocket/internal/Requester.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,14 @@ private void assertStarted() {
*/
public double availability() {
if (!honorLease) {
return 1.0;
return connection.availability();
}
final long now = System.currentTimeMillis();
double available = 0.0;
if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) {
available = 1.0;
}
return available;
return available * connection.availability();
}

/*
Expand Down Expand Up @@ -873,6 +873,7 @@ public void error(Throwable e) {

Publisher<Frame> keepaliveTicker =
PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS);

connection.addOutput(keepaliveTicker,
new Completable() {
public void success() {}
Expand Down
Loading