Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions reactivesocket-client/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
compile project(':reactivesocket-core')
testCompile project(':reactivesocket-test')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket.client;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.ReactiveSocketConnector;
import io.reactivesocket.ReactiveSocketFactory;
import io.reactivesocket.client.filter.*;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

public class Builder {
private static AtomicInteger counter = new AtomicInteger(0);
private final String name;

private final ScheduledExecutorService executor;

private final long requestTimeout;
private final TimeUnit requestTimeoutUnit;

private final long connectTimeout;
private final TimeUnit connectTimeoutUnit;

private final double backupQuantile;

private final int retries;

private final ReactiveSocketConnector<SocketAddress> connector;
private final Function<Throwable, Boolean> retryThisException;

private final Publisher<List<SocketAddress>> source;

private Builder(
String name,
ScheduledExecutorService executor,
long requestTimeout, TimeUnit requestTimeoutUnit,
long connectTimeout, TimeUnit connectTimeoutUnit,
double backupQuantile,
int retries, Function<Throwable, Boolean> retryThisException,
ReactiveSocketConnector<SocketAddress> connector,
Publisher<List<SocketAddress>> source
) {
this.name = name;
this.executor = executor;
this.requestTimeout = requestTimeout;
this.requestTimeoutUnit = requestTimeoutUnit;
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
this.backupQuantile = backupQuantile;
this.retries = retries;
this.connector = connector;
this.retryThisException = retryThisException;
this.source = source;
}

public Builder withRequestTimeout(long timeout, TimeUnit unit) {
return new Builder(
name,
executor,
timeout, unit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public Builder withConnectTimeout(long timeout, TimeUnit unit) {
return new Builder(
name,
executor,
requestTimeout, requestTimeoutUnit,
timeout, unit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public Builder withBackupRequest(double quantile) {
return new Builder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
quantile,
retries, retryThisException,
connector,
source
);
}

public Builder withExecutor(ScheduledExecutorService executor) {
return new Builder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public Builder withConnector(ReactiveSocketConnector<SocketAddress> connector) {
return new Builder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public Builder withSource(Publisher<List<SocketAddress>> source) {
return new Builder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public Builder withRetries(int nbOfRetries, Function<Throwable, Boolean> retryThisException) {
return new Builder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
nbOfRetries, retryThisException,
connector,
source
);
}

public ReactiveSocket build() {
if (source == null) {
throw new IllegalStateException("Please configure the source!");
}
if (connector == null) {
throw new IllegalStateException("Please configure the connector!");
}

ReactiveSocketConnector<SocketAddress> filterConnector = connector
.chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor))
.chain(DrainingSocket::new);

Publisher<List<ReactiveSocketFactory<SocketAddress>>> factories =
sourceToFactory(source, filterConnector);

ReactiveSocket socket = new LoadBalancer(factories);
if (0.0 < backupQuantile && backupQuantile < 1.0) {
socket = new BackupRequestSocket(socket, backupQuantile, executor);
}
if (retries > 0) {
socket = new RetrySocket(socket, retries, t -> true);
}
return socket;
}

private Publisher<List<ReactiveSocketFactory<SocketAddress>>> sourceToFactory(
Publisher<List<SocketAddress>> source,
ReactiveSocketConnector<SocketAddress> connector
) {
return subscriber ->
source.subscribe(new Subscriber<List<SocketAddress>>() {
private Map<SocketAddress, ReactiveSocketFactory<SocketAddress>> current;

@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
current = new HashMap<>();
}

@Override
public void onNext(List<SocketAddress> socketAddresses) {
socketAddresses.stream()
.filter(sa -> !current.containsKey(sa))
.map(connector::toFactory)
.map(factory -> new TimeoutFactory<>(factory, connectTimeout, connectTimeoutUnit, executor))
.map(FailureAwareFactory::new)
.forEach(factory -> current.put(factory.remote(), factory));

Set<SocketAddress> addresses = new HashSet<>(socketAddresses);
Iterator<Map.Entry<SocketAddress, ReactiveSocketFactory<SocketAddress>>> it =
current.entrySet().iterator();
while (it.hasNext()) {
SocketAddress sa = it.next().getKey();
if (! addresses.contains(sa)) {
it.remove();
}
}

List<ReactiveSocketFactory<SocketAddress>> factories =
current.values().stream().collect(Collectors.toList());
subscriber.onNext(factories);
}

@Override
public void onError(Throwable t) { subscriber.onError(t); }

@Override
public void onComplete() { subscriber.onComplete(); }
});
}

public static Builder instance() {
return new Builder(
"rs-loadbalancer-" + counter.incrementAndGet(),
Executors.newScheduledThreadPool(4, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("reactivesocket-scheduler-thread");
thread.setDaemon(true);
return thread;
}
}),
1, TimeUnit.SECONDS,
10, TimeUnit.SECONDS,
0.99,
3, t -> true,
null,
null
);
}
}

Loading