From 587bdaf876c031f2bed8cc4e3904dd8b05f88cd8 Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 14 Jun 2016 14:49:17 -0700 Subject: [PATCH] Migrate reactivesocket-load-balancer into reactivesocket-java **Problem** The multi-repo structure is difficult to use in a context of rapid short iterations. **Solution** Migrate the reactivesocket-load-balancer repo inside reactivesocket-java. The submodules have been renamed like this: - reactivesocket-load-balancer-builder -> reactivesocket-client - reactivesocket-load-balancer-core -> reactivesocket-client - reactivesocket-load-balancer-eureka -> reactivesocket-discovery-eureka - reactivesocket-load-balancer-servo -> reactivesocket-stats-servo I also added a simple module `reactivesocket-examples`, which only contains one file so far. The goal of this module is to demonstrate simple examples. --- reactivesocket-client/build.gradle | 4 + .../io/reactivesocket/client/Builder.java | 263 ++++++++ .../reactivesocket/client/LoadBalancer.java | 621 ++++++++++++++++++ .../reactivesocket/client/WeightedSocket.java | 264 ++++++++ .../NoAvailableReactiveSocketException.java | 23 + .../client/exception/TimeoutException.java | 23 + .../client/filter/BackupRequestSocket.java | 250 +++++++ .../client/filter/DrainingSocket.java | 176 +++++ .../client/filter/FailureAwareFactory.java | 218 ++++++ .../client/filter/RetrySocket.java | 116 ++++ .../client/filter/TimeoutFactory.java | 68 ++ .../client/filter/TimeoutSocket.java | 78 +++ .../client/filter/TimeoutSubscriber.java | 87 +++ .../io/reactivesocket/client/stat/Ewma.java | 60 ++ .../client/stat/FrugalQuantile.java | 107 +++ .../io/reactivesocket/client/stat/Median.java | 79 +++ .../reactivesocket/client/stat/Quantile.java | 30 + .../io/reactivesocket/client/util/Clock.java | 33 + .../client/FailureReactiveSocketTest.java | 163 +++++ .../client/LoadBalancerTest.java | 157 +++++ .../client/TestingReactiveSocket.java | 135 ++++ .../client/TimeoutFactoryTest.java | 59 ++ .../client/stat/MedianTest.java | 52 ++ reactivesocket-discovery-eureka/build.gradle | 4 + .../discovery/eureka/Eureka.java | 50 ++ reactivesocket-examples/build.gradle | 9 + .../reactivesocket/examples/EchoClient.java | 60 ++ reactivesocket-stats-servo/build.gradle | 7 + .../AvailabilityMetricReactiveSocket.java | 115 ++++ .../servo/ServoMetricsReactiveSocket.java | 149 +++++ .../servo/internal/HdrHistogramGauge.java | 27 + .../servo/internal/HdrHistogramMaxGauge.java | 25 + .../servo/internal/HdrHistogramMinGauge.java | 25 + .../internal/HdrHistogramServoTimer.java | 123 ++++ .../servo/internal/ThreadLocalAdder.java | 105 +++ .../internal/ThreadLocalAdderCounter.java | 113 ++++ .../servo/ServoMetricsReactiveSocketTest.java | 307 +++++++++ settings.gradle | 4 + 38 files changed, 4189 insertions(+) create mode 100644 reactivesocket-client/build.gradle create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/Builder.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/exception/NoAvailableReactiveSocketException.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/exception/TimeoutException.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/BackupRequestSocket.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/DrainingSocket.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutFactory.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSocket.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSubscriber.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Ewma.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/stat/FrugalQuantile.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Median.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Quantile.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/util/Clock.java create mode 100644 reactivesocket-client/src/test/java/io/reactivesocket/client/FailureReactiveSocketTest.java create mode 100644 reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java create mode 100644 reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java create mode 100644 reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java create mode 100644 reactivesocket-client/src/test/java/io/reactivesocket/client/stat/MedianTest.java create mode 100644 reactivesocket-discovery-eureka/build.gradle create mode 100644 reactivesocket-discovery-eureka/src/main/java/io/reactivesocket/discovery/eureka/Eureka.java create mode 100644 reactivesocket-examples/build.gradle create mode 100644 reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java create mode 100644 reactivesocket-stats-servo/build.gradle create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocket.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramGauge.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMaxGauge.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMinGauge.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramServoTimer.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdder.java create mode 100644 reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdderCounter.java create mode 100644 reactivesocket-stats-servo/src/test/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocketTest.java diff --git a/reactivesocket-client/build.gradle b/reactivesocket-client/build.gradle new file mode 100644 index 000000000..887584cdc --- /dev/null +++ b/reactivesocket-client/build.gradle @@ -0,0 +1,4 @@ +dependencies { + compile project(':reactivesocket-core') + testCompile project(':reactivesocket-test') +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/Builder.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/Builder.java new file mode 100644 index 000000000..67c8d3f0c --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/Builder.java @@ -0,0 +1,263 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client; + +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import io.reactivesocket.ReactiveSocketFactory; +import io.reactivesocket.client.filter.*; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.net.SocketAddress; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Builder { + private static AtomicInteger counter = new AtomicInteger(0); + private final String name; + + private final ScheduledExecutorService executor; + + private final long requestTimeout; + private final TimeUnit requestTimeoutUnit; + + private final long connectTimeout; + private final TimeUnit connectTimeoutUnit; + + private final double backupQuantile; + + private final int retries; + + private final ReactiveSocketConnector connector; + private final Function retryThisException; + + private final Publisher> source; + + private Builder( + String name, + ScheduledExecutorService executor, + long requestTimeout, TimeUnit requestTimeoutUnit, + long connectTimeout, TimeUnit connectTimeoutUnit, + double backupQuantile, + int retries, Function retryThisException, + ReactiveSocketConnector connector, + Publisher> source + ) { + this.name = name; + this.executor = executor; + this.requestTimeout = requestTimeout; + this.requestTimeoutUnit = requestTimeoutUnit; + this.connectTimeout = connectTimeout; + this.connectTimeoutUnit = connectTimeoutUnit; + this.backupQuantile = backupQuantile; + this.retries = retries; + this.connector = connector; + this.retryThisException = retryThisException; + this.source = source; + } + + public Builder withRequestTimeout(long timeout, TimeUnit unit) { + return new Builder( + name, + executor, + timeout, unit, + connectTimeout, connectTimeoutUnit, + backupQuantile, + retries, retryThisException, + connector, + source + ); + } + + public Builder withConnectTimeout(long timeout, TimeUnit unit) { + return new Builder( + name, + executor, + requestTimeout, requestTimeoutUnit, + timeout, unit, + backupQuantile, + retries, retryThisException, + connector, + source + ); + } + + public Builder withBackupRequest(double quantile) { + return new Builder( + name, + executor, + requestTimeout, requestTimeoutUnit, + connectTimeout, connectTimeoutUnit, + quantile, + retries, retryThisException, + connector, + source + ); + } + + public Builder withExecutor(ScheduledExecutorService executor) { + return new Builder( + name, + executor, + requestTimeout, requestTimeoutUnit, + connectTimeout, connectTimeoutUnit, + backupQuantile, + retries, retryThisException, + connector, + source + ); + } + + public Builder withConnector(ReactiveSocketConnector connector) { + return new Builder( + name, + executor, + requestTimeout, requestTimeoutUnit, + connectTimeout, connectTimeoutUnit, + backupQuantile, + retries, retryThisException, + connector, + source + ); + } + + public Builder withSource(Publisher> source) { + return new Builder( + name, + executor, + requestTimeout, requestTimeoutUnit, + connectTimeout, connectTimeoutUnit, + backupQuantile, + retries, retryThisException, + connector, + source + ); + } + + public Builder withRetries(int nbOfRetries, Function retryThisException) { + return new Builder( + name, + executor, + requestTimeout, requestTimeoutUnit, + connectTimeout, connectTimeoutUnit, + backupQuantile, + nbOfRetries, retryThisException, + connector, + source + ); + } + + public ReactiveSocket build() { + if (source == null) { + throw new IllegalStateException("Please configure the source!"); + } + if (connector == null) { + throw new IllegalStateException("Please configure the connector!"); + } + + ReactiveSocketConnector filterConnector = connector + .chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor)) + .chain(DrainingSocket::new); + + Publisher>> factories = + sourceToFactory(source, filterConnector); + + ReactiveSocket socket = new LoadBalancer(factories); + if (0.0 < backupQuantile && backupQuantile < 1.0) { + socket = new BackupRequestSocket(socket, backupQuantile, executor); + } + if (retries > 0) { + socket = new RetrySocket(socket, retries, t -> true); + } + return socket; + } + + private Publisher>> sourceToFactory( + Publisher> source, + ReactiveSocketConnector connector + ) { + return subscriber -> + source.subscribe(new Subscriber>() { + private Map> current; + + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + current = new HashMap<>(); + } + + @Override + public void onNext(List socketAddresses) { + socketAddresses.stream() + .filter(sa -> !current.containsKey(sa)) + .map(connector::toFactory) + .map(factory -> new TimeoutFactory<>(factory, connectTimeout, connectTimeoutUnit, executor)) + .map(FailureAwareFactory::new) + .forEach(factory -> current.put(factory.remote(), factory)); + + Set addresses = new HashSet<>(socketAddresses); + Iterator>> it = + current.entrySet().iterator(); + while (it.hasNext()) { + SocketAddress sa = it.next().getKey(); + if (! addresses.contains(sa)) { + it.remove(); + } + } + + List> factories = + current.values().stream().collect(Collectors.toList()); + subscriber.onNext(factories); + } + + @Override + public void onError(Throwable t) { subscriber.onError(t); } + + @Override + public void onComplete() { subscriber.onComplete(); } + }); + } + + public static Builder instance() { + return new Builder( + "rs-loadbalancer-" + counter.incrementAndGet(), + Executors.newScheduledThreadPool(4, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName("reactivesocket-scheduler-thread"); + thread.setDaemon(true); + return thread; + } + }), + 1, TimeUnit.SECONDS, + 10, TimeUnit.SECONDS, + 0.99, + 3, t -> true, + null, + null + ); + } +} + diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java new file mode 100644 index 000000000..22e201d45 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -0,0 +1,621 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketFactory; +import io.reactivesocket.exceptions.TransportException; +import io.reactivesocket.client.util.Clock; +import io.reactivesocket.client.exception.NoAvailableReactiveSocketException; +import io.reactivesocket.client.stat.Ewma; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.client.stat.FrugalQuantile; +import io.reactivesocket.client.stat.Quantile; +import io.reactivesocket.util.ReactiveSocketProxy; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This {@link ReactiveSocket} implementation will load balance the request across a + * pool of children ReactiveSockets. + * It estimates the load of each ReactiveSocket based on statistics collected. + */ +public class LoadBalancer implements ReactiveSocket { + private static Logger logger = LoggerFactory.getLogger(LoadBalancer .class); + + private static final double MIN_PENDINGS = 1.0; + private static final double MAX_PENDINGS = 2.0; + private static final int MIN_APERTURE = 3; + private static final int MAX_APERTURE = 100; + private static final long APERTURE_REFRESH_PERIOD = Clock.unit().convert(15, TimeUnit.SECONDS); + private static final long MAX_REFRESH_PERIOD = Clock.unit().convert(5, TimeUnit.MINUTES); + private static final int EFFORT = 5; + + private final double expFactor; + private final Quantile lowerQuantile; + private final Quantile higherQuantile; + + private int pendingSockets; + private final Map activeSockets; + private final Map> activeFactories; + private final FactoriesRefresher factoryRefresher; + + private Ewma pendings; + private volatile int targetAperture; + private long lastApertureRefresh; + private long refreshPeriod; + private volatile long lastRefresh; + + public LoadBalancer(Publisher>> factories) { + this.expFactor = 4.0; + this.lowerQuantile = new FrugalQuantile(0.2); + this.higherQuantile = new FrugalQuantile(0.8); + + this.activeSockets = new HashMap<>(); + this.activeFactories = new HashMap<>(); + this.pendingSockets = 0; + this.factoryRefresher = new FactoriesRefresher(); + + this.pendings = new Ewma(15, TimeUnit.SECONDS, (MIN_PENDINGS + MAX_PENDINGS) / 2); + this.targetAperture = MIN_APERTURE; + this.lastApertureRefresh = 0L; + this.refreshPeriod = Clock.unit().convert(15, TimeUnit.SECONDS); + this.lastRefresh = Clock.now(); + + factories.subscribe(factoryRefresher); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> select().fireAndForget(payload).subscribe(subscriber); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> select().requestResponse(payload).subscribe(subscriber); + } + + @Override + public Publisher requestSubscription(Payload payload) { + // TODO: deal with subscription & cie + return subscriber -> select().requestSubscription(payload).subscribe(subscriber); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> select().requestStream(payload).subscribe(subscriber); + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> select().metadataPush(payload).subscribe(subscriber); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> select().requestChannel(payloads).subscribe(subscriber); + } + + private synchronized void addSockets(int numberOfNewSocket) { + activeFactories.entrySet() + .stream() + // available factories that don't map to an already established socket + .filter(e -> !activeSockets.containsKey(e.getKey())) + .map(e -> e.getValue()) + .filter(factory -> factory.availability() > 0.0) + .sorted((a, b) -> -Double.compare(a.availability(), b.availability())) + .limit(numberOfNewSocket) + .forEach(factory -> { + pendingSockets += 1; + factory.apply().subscribe(new SocketAdder(factory.remote())); + }); + } + + private synchronized void refreshAperture() { + int n = activeSockets.size(); + if (n == 0) { + return; + } + + double p = 0.0; + for (WeightedSocket wrs: activeSockets.values()) { + p += wrs.getPending(); + } + p /= (n + pendingSockets); + pendings.insert(p); + double avgPending = pendings.value(); + + long now = Clock.now(); + boolean underRateLimit = now - lastApertureRefresh > APERTURE_REFRESH_PERIOD; + int previous = targetAperture; + if (avgPending < 1.0 && underRateLimit) { + targetAperture--; + lastApertureRefresh = now; + pendings.reset((MIN_PENDINGS + MAX_PENDINGS)/2); + } else if (2.0 < avgPending && underRateLimit) { + targetAperture++; + lastApertureRefresh = now; + pendings.reset((MIN_PENDINGS + MAX_PENDINGS)/2); + } + targetAperture = Math.max(MIN_APERTURE, targetAperture); + int maxAperture = Math.min(MAX_APERTURE, activeFactories.size()); + targetAperture = Math.min(maxAperture, targetAperture); + + if (targetAperture != previous) { + logger.info("Current pending=" + avgPending + + ", new target=" + targetAperture + + ", previous target=" + previous); + } + } + + /** + * Responsible for: + * - refreshing the aperture + * - asynchronously adding/removing reactive sockets to match targetAperture + * - periodically add a new connection + */ + private synchronized void refreshSockets() { + refreshAperture(); + + int n = pendingSockets + activeSockets.size(); + if (n < targetAperture) { + logger.info("aperture " + n + + " is below target " + targetAperture + + ", adding " + (targetAperture - n) + " sockets"); + addSockets(targetAperture - n); + } else if (targetAperture < n) { + logger.info("aperture " + n + + " is above target " + targetAperture + + ", quicking 1 socket"); + quickSlowestRS(); + } + + long now = Clock.now(); + if (now - lastRefresh < refreshPeriod) { + return; + } else { + long prev = refreshPeriod; + refreshPeriod = (long) Math.min(refreshPeriod * 1.5, MAX_REFRESH_PERIOD); + logger.info("Bumping refresh period, " + (prev/1000) + "->" + (refreshPeriod/1000)); + } + lastRefresh = now; + addSockets(1); + } + + private synchronized void quickSlowestRS() { + if (activeSockets.size() <= 1) { + return; + } + + activeSockets.entrySet().forEach(e -> { + SocketAddress key = e.getKey(); + WeightedSocket value = e.getValue(); + logger.info("> " + key + " -> " + value); + }); + + activeSockets.entrySet() + .stream() + .sorted((a,b) -> { + WeightedSocket socket1 = a.getValue(); + WeightedSocket socket2 = b.getValue(); + double load1 = 1.0/socket1.getPredictedLatency() * socket1.availability(); + double load2 = 1.0/socket2.getPredictedLatency() * socket2.availability(); + return Double.compare(load1, load2); + }) + .limit(1) + .forEach(entry -> { + SocketAddress key = entry.getKey(); + WeightedSocket slowest = entry.getValue(); + try { + logger.info("quicking slowest: " + key + " -> " + slowest); + activeSockets.remove(key); + slowest.close(); + } catch (Exception e) { + logger.warn("Exception while closing a ReactiveSocket", e); + } + }); + } + + @Override + public synchronized double availability() { + double currentAvailability = 0.0; + if (!activeSockets.isEmpty()) { + for (WeightedSocket rs : activeSockets.values()) { + currentAvailability += rs.availability(); + } + currentAvailability /= activeSockets.size(); + } + + return currentAvailability; + } + + @Override + public void start(Completable c) { + c.success(); // automatically started in the constructor + } + + @Override + public void onRequestReady(Consumer c) { + throw new RuntimeException("onRequestReady not implemented"); + } + + @Override + public void onRequestReady(Completable c) { + throw new RuntimeException("onRequestReady not implemented"); + } + + @Override + public void onShutdown(Completable c) { + throw new RuntimeException("onShutdown not implemented"); + } + + @Override + public synchronized void sendLease(int ttl, int numberOfRequests) { + activeSockets.values().forEach(socket -> + socket.sendLease(ttl, numberOfRequests) + ); + } + + @Override + public void shutdown() { + try { + close(); + } catch (Exception e) { + logger.warn("Exception while calling `shutdown` on a ReactiveSocket", e); + } + } + + private synchronized ReactiveSocket select() { + if (activeSockets.isEmpty()) { + return FAILING_REACTIVE_SOCKET; + } + refreshSockets(); + + int size = activeSockets.size(); + List buffer = activeSockets.values().stream().collect(Collectors.toList()); + if (size == 1) { + return buffer.get(0); + } + + WeightedSocket rsc1 = null; + WeightedSocket rsc2 = null; + + Random rng = ThreadLocalRandom.current(); + for (int i = 0; i < EFFORT; i++) { + int i1 = rng.nextInt(size); + int i2 = rng.nextInt(size - 1); + if (i2 >= i1) { + i2++; + } + rsc1 = buffer.get(i1); + rsc2 = buffer.get(i2); + if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) + break; + } + + double w1 = algorithmicWeight(rsc1); + double w2 = algorithmicWeight(rsc2); + if (w1 < w2) { + return rsc2; + } else { + return rsc1; + } + } + + private double algorithmicWeight(WeightedSocket socket) { + if (socket == null || socket.availability() == 0.0) { + return 0.0; + } + + int pendings = socket.getPending(); + double latency = socket.getPredictedLatency(); + + double low = lowerQuantile.estimation(); + double high = Math.max(higherQuantile.estimation(), low * 1.001); // ensure higherQuantile > lowerQuantile + .1% + double bandWidth = Math.max(high - low, 1); + + if (latency < low) { + double alpha = (low - latency) / bandWidth; + double bonusFactor = Math.pow(1 + alpha, expFactor); + latency /= bonusFactor; + } else if (latency > high) { + double alpha = (latency - high) / bandWidth; + double penaltyFactor = Math.pow(1 + alpha, expFactor); + latency *= penaltyFactor; + } + + return socket.availability() * 1.0 / (1.0 + latency * (pendings + 1)); + } + + @Override + public synchronized String toString() { + return "LoadBalancer(a:" + activeSockets.size()+ ", f: " + + activeFactories.size() + + ", avgPendings=" + pendings.value() + + ", targetAperture=" + targetAperture + + ", band=[" + lowerQuantile.estimation() + + ", " + higherQuantile.estimation() + + "])"; + } + + @Override + public synchronized void close() throws Exception { + // TODO: have a `closed` flag? + factoryRefresher.close(); + activeFactories.clear(); + activeSockets.values().forEach(rs -> { + try { + rs.close(); + } catch (Exception e) { + logger.warn("Exception while closing a ReactiveSocket", e); + } + }); + } + + private class RemoveItselfSubscriber implements Subscriber { + private Subscriber child; + private SocketAddress key; + + private RemoveItselfSubscriber(Subscriber child, SocketAddress key) { + this.child = child; + this.key = key; + } + + @Override + public void onSubscribe(Subscription s) { + child.onSubscribe(s); + } + + @Override + public void onNext(Payload payload) { + child.onNext(payload); + } + + @Override + public void onError(Throwable t) { + child.onError(t); + if (t instanceof TransportException) { + System.out.println(t + " removing socket " + child); + synchronized (LoadBalancer.this) { + activeSockets.remove(key); + } + } + } + + @Override + public void onComplete() { + child.onComplete(); + } + } + + /** + * This subscriber role is to subscribe to the list of server identifier, and update the + * factory list. + */ + private class FactoriesRefresher implements Subscriber>> { + private Subscription subscription; + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(List> newFactories) { + List> removed = computeRemoved(newFactories); + synchronized (LoadBalancer.this) { + boolean changed = false; + for (ReactiveSocketFactory factory : removed) { + SocketAddress key = factory.remote(); + activeFactories.remove(key); + WeightedSocket removedSocket = activeSockets.remove(key); + try { + if (removedSocket != null) { + changed = true; + removedSocket.close(); + } + } catch (Exception e) { + logger.warn("Exception while closing a ReactiveSocket", e); + } + } + + for (ReactiveSocketFactory factory : newFactories) { + if (!activeFactories.containsKey(factory.remote())) { + activeFactories.put(factory.remote(), factory); + changed = true; + } + } + + if (changed && logger.isInfoEnabled()) { + String msg = "UPDATING ACTIVE FACTORIES"; + for (Map.Entry> e : activeFactories.entrySet()) { + msg += " + " + e.getKey() + ": " + e.getValue() + "\n"; + } + logger.info(msg); + } + } + refreshSockets(); + } + + @Override + public void onError(Throwable t) { + // TODO: retry + } + + @Override + public void onComplete() { + // TODO: retry + } + + void close() { + subscription.cancel(); + } + + private List> computeRemoved( + List> newFactories) { + ArrayList> removed = new ArrayList<>(); + + synchronized (LoadBalancer.this) { + for (Map.Entry> e : activeFactories.entrySet()) { + SocketAddress key = e.getKey(); + ReactiveSocketFactory factory = e.getValue(); + + boolean isRemoved = true; + for (ReactiveSocketFactory f : newFactories) { + if (f.remote() == key) { + isRemoved = false; + break; + } + } + if (isRemoved) { + removed.add(factory); + } + } + } + return removed; + } + } + + private class SocketAdder implements Subscriber { + private final SocketAddress remote; + + private SocketAdder(SocketAddress remote) { + this.remote = remote; + } + + @Override + public void onSubscribe(Subscription s) { + s.request(1L); + } + + @Override + public void onNext(ReactiveSocket rs) { + synchronized (LoadBalancer.this) { + if (activeSockets.size() >= targetAperture) { + quickSlowestRS(); + } + + ReactiveSocket proxy = new ReactiveSocketProxy(rs, + s -> new RemoveItselfSubscriber(s, remote)); + WeightedSocket weightedSocket = new WeightedSocket(proxy, lowerQuantile, higherQuantile); + logger.info("Adding new WeightedSocket " + + weightedSocket + " connected to " + remote); + activeSockets.put(remote, weightedSocket); + pendingSockets -= 1; + } + } + + @Override + public void onError(Throwable t) { + logger.warn("Exception while subscribing to the ReactiveSocket source", t); + synchronized (LoadBalancer.this) { + pendingSockets -= 1; + } + } + + @Override + public void onComplete() {} + } + + private static final FailingReactiveSocket FAILING_REACTIVE_SOCKET = new FailingReactiveSocket(); + + /** + * (Null Object Pattern) + * This failing ReactiveSocket never succeed, it is useful for simplifying the code + * when dealing with edge cases. + */ + private static class FailingReactiveSocket implements ReactiveSocket { + @SuppressWarnings("ThrowableInstanceNeverThrown") + private static final NoAvailableReactiveSocketException NO_AVAILABLE_RS_EXCEPTION = + new NoAvailableReactiveSocketException(); + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public double availability() { + return 0; + } + + @Override + public void start(Completable c) { + c.error(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public void onRequestReady(Consumer c) { + c.accept(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public void onRequestReady(Completable c) { + c.error(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public void onShutdown(Completable c) { + c.error(NO_AVAILABLE_RS_EXCEPTION); + } + + @Override + public void sendLease(int ttl, int numberOfRequests) {} + + @Override + public void shutdown() {} + + @Override + public void close() throws Exception {} + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java new file mode 100644 index 000000000..a05340f86 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java @@ -0,0 +1,264 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.client.util.Clock; +import io.reactivesocket.client.stat.Ewma; +import io.reactivesocket.client.stat.Median; +import io.reactivesocket.client.stat.Quantile; +import io.reactivesocket.util.ReactiveSocketProxy; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Wrapper of a ReactiveSocket, it computes statistics about the req/resp calls and + * update availability accordingly. + */ +public class WeightedSocket extends ReactiveSocketProxy { + private static final double STARTUP_PENALTY = Long.MAX_VALUE >> 12; + + private final ReactiveSocket child; + private final Quantile lowerQuantile; + private final Quantile higherQuantile; + private final long inactivityFactor; + + private volatile int pending; // instantaneous rate + private long stamp; // last timestamp we sent a request + private long stamp0; // last timestamp we sent a request or receive a response + private long duration; // instantaneous cumulative duration + + private Median median; + private Ewma interArrivalTime; + + private AtomicLong pendingStreams; // number of active streams + + public WeightedSocket(ReactiveSocket child, Quantile lowerQuantile, Quantile higherQuantile, int inactivityFactor) { + super(child); + this.child = child; + this.lowerQuantile = lowerQuantile; + this.higherQuantile = higherQuantile; + this.inactivityFactor = inactivityFactor; + long now = Clock.now(); + this.stamp = now; + this.stamp0 = now; + this.duration = 0L; + this.pending = 0; + this.median = new Median(); + this.interArrivalTime = new Ewma(1, TimeUnit.MINUTES, 1000); + this.pendingStreams = new AtomicLong(); + } + + public WeightedSocket(ReactiveSocket child, Quantile lowerQuantile, Quantile higherQuantile) { + this(child, lowerQuantile, higherQuantile, 100); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + child.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber)); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> + child.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> + child.requestSubscription(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> + child.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> + child.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> + child.requestChannel(payloads).subscribe(new CountingSubscriber<>(subscriber)); + } + + public synchronized double getPredictedLatency() { + long now = Clock.now(); + long elapsed = Math.max(now - stamp, 1L); + + double weight; + double prediction = median.estimation(); + + if (prediction == 0.0) { + if (pending == 0) { + weight = 0.0; // first request + } else { + // subsequent requests while we don't have any history + weight = STARTUP_PENALTY + pending; + } + } else if (pending == 0 && elapsed > inactivityFactor * interArrivalTime.value()) { + // if we did't see any data for a while, we decay the prediction by inserting + // artificial 0.0 into the median + median.insert(0.0); + weight = median.estimation(); + } else { + double predicted = prediction * pending; + double instant = instantaneous(now); + + if (predicted < instant) { // NB: (0.0 < 0.0) == false + weight = instant / pending; // NB: pending never equal 0 here + } else { + // we are under the predictions + weight = prediction; + } + } + + return weight; + } + + public int getPending() { + return pending; + } + + private synchronized long instantaneous(long now) { + return duration + (now - stamp0) * pending; + } + + private synchronized long incr() { + long now = Clock.now(); + interArrivalTime.insert(now - stamp); + duration += Math.max(0, now - stamp0) * pending; + pending += 1; + stamp = now; + stamp0 = now; + return now; + } + + private synchronized long decr(long timestamp) { + long now = Clock.now(); + duration += Math.max(0, now - stamp0) * pending - (now - timestamp); + pending -= 1; + stamp0 = now; + return now; + } + + private synchronized void observe(double rtt) { + median.insert(rtt); + lowerQuantile.insert(rtt); + higherQuantile.insert(rtt); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public String toString() { + return "WeightedSocket@" + hashCode() + + " [median:" + median.estimation() + + " quantile-low:" + lowerQuantile.estimation() + + " quantile-high:" + higherQuantile.estimation() + + " inter-arrival:" + interArrivalTime.value() + + " duration/pending:" + (pending == 0 ? 0 : (double)duration / pending) + + " availability: " + availability() + + "]->" + child.toString(); + } + + /** + * Subscriber wrapper used for request/response interaction model, measure and collect + * latency information. + */ + private class LatencySubscriber implements Subscriber { + private final Subscriber child; + private long start; + + LatencySubscriber(Subscriber child) { + this.child = child; + } + + @Override + public void onSubscribe(Subscription s) { + child.onSubscribe(s); + start = incr(); + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable t) { + child.onError(t); + decr(start); + } + + @Override + public void onComplete() { + long now = decr(start); + observe(now - start); + child.onComplete(); + } + } + + /** + * Subscriber wrapper used for stream like interaction model, it only counts the number of + * active streams + */ + private class CountingSubscriber implements Subscriber { + private final Subscriber child; + + CountingSubscriber(Subscriber child) { + this.child = child; + } + + @Override + public void onSubscribe(Subscription s) { + pendingStreams.incrementAndGet(); + child.onSubscribe(s); + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable t) { + pendingStreams.decrementAndGet(); + child.onError(t); + } + + @Override + public void onComplete() { + pendingStreams.decrementAndGet(); + child.onComplete(); + } + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/exception/NoAvailableReactiveSocketException.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/exception/NoAvailableReactiveSocketException.java new file mode 100644 index 000000000..6941140fb --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/exception/NoAvailableReactiveSocketException.java @@ -0,0 +1,23 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.exception; + +public class NoAvailableReactiveSocketException extends Exception { + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/exception/TimeoutException.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/exception/TimeoutException.java new file mode 100644 index 000000000..bda372254 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/exception/TimeoutException.java @@ -0,0 +1,23 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.exception; + +public class TimeoutException extends Exception { + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/BackupRequestSocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/BackupRequestSocket.java new file mode 100644 index 000000000..b98ccee3d --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/BackupRequestSocket.java @@ -0,0 +1,250 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.client.util.Clock; +import io.reactivesocket.client.stat.FrugalQuantile; +import io.reactivesocket.client.stat.Quantile; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class BackupRequestSocket implements ReactiveSocket { + private final ScheduledExecutorService executor; + private final ReactiveSocket child; + private final Quantile q; + + public BackupRequestSocket(ReactiveSocket child, double quantile, ScheduledExecutorService executor) { + this.child = child; + this.executor = executor; + q = new FrugalQuantile(quantile); + } + + public BackupRequestSocket(ReactiveSocket child, double quantile) { + this(child, quantile, Executors.newScheduledThreadPool(2)); + } + + public BackupRequestSocket(ReactiveSocket child) { + this(child, 0.99); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return child.fireAndForget(payload); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> { + Subscriber oneSubscriber = new OneSubscriber<>(subscriber); + Subscriber backupRequest = + new FirstRequestSubscriber(oneSubscriber, () -> child.requestResponse(payload)); + child.requestResponse(payload).subscribe(backupRequest); + }; + } + + @Override + public Publisher requestStream(Payload payload) { + return child.requestStream(payload); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return child.requestSubscription(payload); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return child.requestChannel(payloads); + } + + @Override + public Publisher metadataPush(Payload payload) { + return child.metadataPush(payload); + } + + @Override + public double availability() { + return child.availability(); + } + + @Override + public void start(Completable c) { + child.start(c); + } + + @Override + public void onRequestReady(Consumer c) { + child.onRequestReady(c); + } + + @Override + public void onRequestReady(Completable c) { + child.onRequestReady(c); + } + + @Override + public void onShutdown(Completable c) { + child.onShutdown(c); + } + + @Override + public void sendLease(int ttl, int numberOfRequests) { + child.sendLease(ttl, numberOfRequests); + } + + @Override + public void shutdown() { + child.shutdown(); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public String toString() { + return "BackupRequest(q=" + q + ")->" + child; + } + + private class OneSubscriber implements Subscriber { + private final Subscriber subscriber; + private final AtomicBoolean firstEvent; + private final AtomicBoolean firstTerminal; + + private OneSubscriber(Subscriber subscriber) { + this.subscriber = subscriber; + this.firstEvent = new AtomicBoolean(false); + this.firstTerminal = new AtomicBoolean(false); + } + + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(T t) { + if (firstEvent.compareAndSet(false, true)) { + subscriber.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + if (firstTerminal.compareAndSet(false, true)) { + subscriber.onError(t); + } + } + + @Override + public void onComplete() { + if (firstTerminal.compareAndSet(false, true)) { + subscriber.onComplete(); + } + } + } + + private class FirstRequestSubscriber implements Subscriber { + private final Subscriber oneSubscriber; + private Supplier> action; + private long start; + private ScheduledFuture future; + + private FirstRequestSubscriber(Subscriber oneSubscriber, Supplier> action) { + this.oneSubscriber = oneSubscriber; + this.action = action; + } + + @Override + public void onSubscribe(Subscription s) { + start = Clock.now(); + if (q.estimation() > 0) { + future = executor.schedule(() -> { + action.get().subscribe(new BackupRequestSubscriber<>(oneSubscriber, s)); + }, (long) q.estimation(), TimeUnit.MICROSECONDS); + } + oneSubscriber.onSubscribe(s); + } + + @Override + public void onNext(Payload t) { + if (future != null) { + future.cancel(true); + } + oneSubscriber.onNext(t); + long latency = Clock.now() - start; + q.insert(latency); + } + + @Override + public void onError(Throwable t) { + oneSubscriber.onError(t); + } + + @Override + public void onComplete() { + oneSubscriber.onComplete(); + } + } + + private class BackupRequestSubscriber implements Subscriber { + private final Subscriber oneSubscriber; + private Subscription firstRequestSubscription; + private long start; + + private BackupRequestSubscriber(Subscriber oneSubscriber, Subscription firstRequestSubscription) { + this.oneSubscriber = oneSubscriber; + this.firstRequestSubscription = firstRequestSubscription; + } + + @Override + public void onSubscribe(Subscription s) { + start = Clock.now(); + s.request(1); + } + + @Override + public void onNext(T t) { + firstRequestSubscription.cancel(); + oneSubscriber.onNext(t); + long latency = Clock.now() - start; + q.insert(latency); + } + + @Override + public void onError(Throwable t) { + oneSubscriber.onError(t); + } + + @Override + public void onComplete() { + oneSubscriber.onComplete(); + } + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/DrainingSocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/DrainingSocket.java new file mode 100644 index 000000000..b73842969 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/DrainingSocket.java @@ -0,0 +1,176 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class DrainingSocket implements ReactiveSocket { + private final ReactiveSocket child; + private final AtomicInteger count; + private volatile boolean closed; + + private class CountingSubscriber implements Subscriber { + private final Subscriber child; + + private CountingSubscriber(Subscriber child) { + this.child = child; + } + + @Override + public void onSubscribe(Subscription s) { + child.onSubscribe(s); + incr(); + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable t) { + child.onError(t); + decr(); + } + + @Override + public void onComplete() { + child.onComplete(); + decr(); + } + } + + public DrainingSocket(ReactiveSocket child) { + this.child = child; + count = new AtomicInteger(0); + closed = false; + } + + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> + child.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + child.requestResponse(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> + child.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> + child.requestSubscription(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> + child.requestChannel(payloads).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> + child.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber)); + } + + @Override + public double availability() { + if (closed) { + return 0.0; + } else { + return child.availability(); + } + } + + @Override + public void start(Completable c) { + child.start(c); + } + + @Override + public void onRequestReady(Consumer c) { + child.onRequestReady(c); + } + + @Override + public void onRequestReady(Completable c) { + child.onRequestReady(c); + } + + @Override + public void onShutdown(Completable c) { + child.onShutdown(c); + } + + @Override + public void sendLease(int ttl, int numberOfRequests) { + child.sendLease(ttl, numberOfRequests); + } + + @Override + public void shutdown() { + closed = true; + if (count.get() == 0) { + child.shutdown(); + } + } + + @Override + public void close() throws Exception { + closed = true; + if (count.get() == 0) { + child.close(); + } + } + + private void incr() { + count.incrementAndGet(); + } + + private void decr() { + int n = count.decrementAndGet(); + if (closed && n == 0) { + try { + child.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + @Override + public String toString() { + return "DrainingSocket(closed=" + closed + ")->" + child.toString(); + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java new file mode 100644 index 000000000..a275faa87 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java @@ -0,0 +1,218 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketFactory; +import io.reactivesocket.exceptions.TransportException; +import io.reactivesocket.client.util.Clock; +import io.reactivesocket.client.stat.Ewma; +import io.reactivesocket.util.ReactiveSocketProxy; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * This child compute the error rate of a particular remote location and adapt the availability + * of the ReactiveSocketFactory but also of the ReactiveSocket. + * + * It means that if a remote host doesn't generate lots of errors when connecting to it, but a + * lot of them when sending messages, we will still decrease the availability of the child + * reducing the probability of connecting to it. + * + * @param the identifier for the remote server (most likely SocketAddress) + */ +public class FailureAwareFactory implements ReactiveSocketFactory { + private static final double EPSILON = 1e-4; + + private final ReactiveSocketFactory child; + private final long tau; + private long stamp; + private Ewma errorPercentage; + + public FailureAwareFactory(ReactiveSocketFactory child, long halfLife, TimeUnit unit) { + this.child = child; + this.tau = Clock.unit().convert((long)(halfLife / Math.log(2)), unit); + this.stamp = Clock.now(); + errorPercentage = new Ewma(halfLife, unit, 1.0); + } + + public FailureAwareFactory(ReactiveSocketFactory child) { + this(child, 5, TimeUnit.SECONDS); + } + + @Override + public Publisher apply() { + return subscriber -> child.apply().subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(ReactiveSocket reactiveSocket) { + updateErrorPercentage(1.0); + ReactiveSocket wrapped = new FailureAwareReactiveSocket(reactiveSocket); + subscriber.onNext(wrapped); + } + + @Override + public void onError(Throwable t) { + updateErrorPercentage(0.0); + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + } + + public double availability() { + double e = errorPercentage.value(); + if ((Clock.now() - stamp) > tau) { + // If the window is expired artificially increase the availability + double a = Math.min(1.0, e + 0.5); + errorPercentage.reset(a); + } + if (e < EPSILON) { + e = 0.0; + } else if (1.0 - EPSILON < e) { + e = 1.0; + } + + return e; + } + + @Override + public T remote() { + return child.remote(); + } + + private synchronized void updateErrorPercentage(double value) { + errorPercentage.insert(value); + stamp = Clock.now(); + } + + @Override + public String toString() { + return "FailureAwareFactory(" + errorPercentage.value() + ") ~> " + child.toString(); + } + + /** + * ReactiveSocket wrapper that update the statistics associated with a remote server + */ + private class FailureAwareReactiveSocket extends ReactiveSocketProxy { + private class InnerSubscriber implements Subscriber { + private final Subscriber child; + + InnerSubscriber(Subscriber child) { + this.child = child; + } + + @Override + public void onSubscribe(Subscription s) { + child.onSubscribe(s); + } + + @Override + public void onNext(U u) { + child.onNext(u); + } + + @Override + public void onError(Throwable t) { + if (t instanceof TransportException) { + errorPercentage.reset(0.0); + } else { + errorPercentage.insert(0.0); + } + child.onError(t); + } + + @Override + public void onComplete() { + updateErrorPercentage(1.0); + child.onComplete(); + } + } + + FailureAwareReactiveSocket(ReactiveSocket child) { + super(child); + } + + @Override + public double availability() { + double childAvailability = child.availability(); + // If the window is expired set success and failure to zero and return + // the child availability + if ((Clock.now() - stamp) > tau) { + updateErrorPercentage(1.0); + } + return childAvailability * errorPercentage.value(); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + child.requestResponse(payload).subscribe(new InnerSubscriber<>(subscriber)); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> + child.requestSubscription(payload).subscribe(new InnerSubscriber<>(subscriber)); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> + child.requestStream(payload).subscribe(new InnerSubscriber<>(subscriber)); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> + child.fireAndForget(payload).subscribe(new InnerSubscriber<>(subscriber)); + } + + @Override + public Publisher metadataPush(Payload payload) { + return child.metadataPush(payload); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> + child.requestChannel(payloads).subscribe(new InnerSubscriber<>(subscriber)); + } + + @Override + public String toString() { + return "FailureAwareReactiveSocket(" + errorPercentage.value() + ") ~> " + child.toString(); + } + } + + public static + Function, ReactiveSocketFactory> filter(long tau, TimeUnit unit) { + return f -> new FailureAwareFactory<>(f, tau, unit); + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java new file mode 100644 index 000000000..b5d358083 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java @@ -0,0 +1,116 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.util.ReactiveSocketProxy; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; + +public class RetrySocket extends ReactiveSocketProxy { + private final int retry; + private final Function retryThisException; + + public RetrySocket(ReactiveSocket child, int retry, Function retryThisException) { + super(child); + this.retry = retry; + this.retryThisException = retryThisException; + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> child.fireAndForget(payload).subscribe( + new RetrySubscriber<>(subscriber, () -> child.fireAndForget(payload)) + ); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> child.requestResponse(payload).subscribe( + new RetrySubscriber<>(subscriber, () -> child.requestResponse(payload)) + ); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> child.requestStream(payload).subscribe( + new RetrySubscriber<>(subscriber, () -> child.requestStream(payload)) + ); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> child.requestSubscription(payload).subscribe( + new RetrySubscriber<>(subscriber, () -> child.requestSubscription(payload)) + ); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> child.requestChannel(payloads).subscribe( + new RetrySubscriber<>(subscriber, () -> child.requestChannel(payloads)) + ); + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> child.metadataPush(payload).subscribe( + new RetrySubscriber<>(subscriber, () -> child.metadataPush(payload)) + ); + } + + private class RetrySubscriber implements Subscriber { + private final Subscriber child; + private Supplier> action; + private AtomicInteger budget; + + private RetrySubscriber(Subscriber child, Supplier> action) { + this.child = child; + this.action = action; + this.budget = new AtomicInteger(retry); + } + + @Override + public void onSubscribe(Subscription s) { + child.onSubscribe(s); + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable t) { + if (budget.decrementAndGet() > 0 && retryThisException.apply(t)) { + action.get().subscribe(this); + } else { + child.onError(t); + } + } + + @Override + public void onComplete() { + child.onComplete(); + } + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutFactory.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutFactory.java new file mode 100644 index 000000000..9b883c2c9 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutFactory.java @@ -0,0 +1,68 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketFactory; +import org.reactivestreams.Publisher; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class TimeoutFactory implements ReactiveSocketFactory { + private final ReactiveSocketFactory child; + private final ScheduledExecutorService executor; + private final long timeout; + private final TimeUnit unit; + + public TimeoutFactory(ReactiveSocketFactory child, long timeout, TimeUnit unit, ScheduledExecutorService executor) { + this.child = child; + this.timeout = timeout; + this.unit = unit; + this.executor = executor; + } + + public TimeoutFactory(ReactiveSocketFactory child, long timeout, TimeUnit unit) { + this(child, timeout, unit, Executors.newScheduledThreadPool(2)); + } + + @Override + public Publisher apply() { + return subscriber -> + child.apply().subscribe(new TimeoutSubscriber<>(subscriber, executor, timeout, unit)); + } + + @Override + public double availability() { + return child.availability(); + } + + @Override + public T remote() { + return child.remote(); + } + + @Override + public String toString() { + return "TimeoutFactory(" + timeout + " " + unit.toString() + ")->" + child.toString(); + } + + public static Function, ReactiveSocketFactory> filter(long timeout, TimeUnit unit) { + return f -> new TimeoutFactory<>(f, timeout, unit); + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSocket.java new file mode 100644 index 000000000..74220cefc --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSocket.java @@ -0,0 +1,78 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.util.ReactiveSocketProxy; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TimeoutSocket extends ReactiveSocketProxy { + private final ScheduledExecutorService executor; + private final ReactiveSocket child; + private final long timeout; + private final TimeUnit unit; + + public TimeoutSocket(ReactiveSocket child, long timeout, TimeUnit unit, ScheduledExecutorService executor) { + super(child); + this.child = child; + this.timeout = timeout; + this.unit = unit; + this.executor = executor; + } + + public TimeoutSocket(ReactiveSocket child, long timeout, TimeUnit unit) { + this(child, timeout, unit, Executors.newScheduledThreadPool(2)); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return child.fireAndForget(payload); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + child.requestResponse(payload).subscribe(wrap(subscriber)); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> + child.requestStream(payload).subscribe(wrap(subscriber)); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> + child.requestSubscription(payload).subscribe(wrap(subscriber)); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> + child.requestChannel(payloads).subscribe(wrap(subscriber)); + } + + private Subscriber wrap(Subscriber subscriber) { + return new TimeoutSubscriber<>(subscriber, executor, timeout, unit); + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSubscriber.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSubscriber.java new file mode 100644 index 000000000..deaf0a3a7 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/TimeoutSubscriber.java @@ -0,0 +1,87 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.filter; + +import io.reactivesocket.client.exception.TimeoutException; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class TimeoutSubscriber implements Subscriber { + private final Subscriber child; + private final ScheduledExecutorService executor; + private final long timeout; + private final TimeUnit unit; + private Subscription subscription; + private ScheduledFuture future; + private boolean finished; + + TimeoutSubscriber(Subscriber child, ScheduledExecutorService executor, long timeout, TimeUnit unit) { + this.child = child; + this.subscription = null; + this.finished = false; + this.timeout = timeout; + this.unit = unit; + this.executor = executor; + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + future = executor.schedule(this::cancel, timeout, unit); + child.onSubscribe(s); + } + + @Override + public synchronized void onNext(T t) { + if (! finished) { + child.onNext(t); + } + } + + @Override + public synchronized void onError(Throwable t) { + if (! finished) { + finished = true; + child.onError(t); + if (future != null) { + future.cancel(true); + } + } + } + + @Override + public synchronized void onComplete() { + if (! finished) { + finished = true; + child.onComplete(); + if (future != null) { + future.cancel(true); + } + } + } + + private synchronized void cancel() { + if (! finished) { + subscription.cancel(); + child.onError(new TimeoutException()); + finished = true; + } + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Ewma.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Ewma.java new file mode 100644 index 000000000..49e3421ac --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Ewma.java @@ -0,0 +1,60 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.stat; + +import io.reactivesocket.client.util.Clock; +import java.util.concurrent.TimeUnit; + +/** + * Compute the exponential weighted moving average of a series of values. + * The time at which you insert the value into `Ewma` is used to compute a weight (recent + * points are weighted higher). + * The parameter for defining the convergence speed (like most decay process) is + * the half-life. + * + * e.g. with a half-life of 10 unit, if you insert 100 at t=0 and 200 at t=10 + * the ewma will be equal to (200 - 100)/2 = 150 (half of the distance between the + * new and the old value) + */ +public class Ewma { + private final long tau; + private volatile long stamp; + private volatile double ewma; + + public Ewma(long halfLife, TimeUnit unit, double initialValue) { + this.tau = Clock.unit().convert((long)(halfLife / Math.log(2)), unit); + stamp = 0L; + ewma = initialValue; + } + + public synchronized void insert(double x) { + long now = Clock.now(); + double elapsed = Math.max(0, now - stamp); + stamp = now; + + double w = Math.exp(-elapsed / tau); + ewma = w * ewma + (1.0 - w) * x; + } + + public synchronized void reset(double value) { + stamp = 0L; + ewma = value; + } + + public double value() { + return ewma; + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/FrugalQuantile.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/FrugalQuantile.java new file mode 100644 index 000000000..dc9a018c1 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/FrugalQuantile.java @@ -0,0 +1,107 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.stat; + +import java.util.Random; + +/** + * Reference: + * Ma, Qiang, S. Muthukrishnan, and Mark Sandler. "Frugal Streaming for + * Estimating Quantiles." Space-Efficient Data Structures, Streams, and + * Algorithms. Springer Berlin Heidelberg, 2013. 77-96. + * + * More info: http://blog.aggregateknowledge.com/2013/09/16/sketch-of-the-day-frugal-streaming/ + */ +public class FrugalQuantile implements Quantile { + private final double increment; + private double quantile; + private Random rng; + + volatile double estimate; + int step; + int sign; + + public FrugalQuantile(double quantile, double increment, Random rng) { + this.increment = increment; + this.quantile = quantile; + this.estimate = 0.0; + this.step = 1; + this.sign = 0; + this.rng = rng; + } + + public FrugalQuantile(double quantile) { + this(quantile, 1.0, new Random()); + } + + public double estimation() { + return estimate; + } + + @Override + public synchronized void insert(double x) { + if (sign == 0) { + estimate = x; + sign = 1; + return; + } + + if (x > estimate && rng.nextDouble() > (1 - quantile)) { + step += sign * increment; + + if (step > 0) { + estimate += step; + } else { + estimate += 1; + } + + if (estimate > x) { + step += (x - estimate); + estimate = x; + } + + if (sign < 0) { + step = 1; + } + + sign = 1; + } else if (x < estimate && rng.nextDouble() > quantile) { + step -= sign * increment; + + if (step > 0) { + estimate -= step; + } else { + estimate--; + } + + if (estimate < x) { + step += (estimate - x); + estimate = x; + } + + if (sign > 0) { + step = 1; + } + + sign = -1; + } + } + + @Override + public String toString() { + return "FrugalQuantile(q=" + quantile + ", v=" + estimate + ")"; + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Median.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Median.java new file mode 100644 index 000000000..fb86ba1d0 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Median.java @@ -0,0 +1,79 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.stat; + +/** + * This implementation gives better results because it considers more data-point. + */ +public class Median extends FrugalQuantile { + public Median() { + super(0.5, 1.0, null); + } + + @Override + public synchronized void insert(double x) { + if (sign == 0) { + estimate = x; + sign = 1; + return; + } + + if (x > estimate) { + step += sign; + + if (step > 0) { + estimate += step; + } else { + estimate += 1; + } + + if (estimate > x) { + step += (x - estimate); + estimate = x; + } + + if (sign < 0) { + step = 1; + } + + sign = 1; + } else if (x < estimate) { + step -= sign; + + if (step > 0) { + estimate -= step; + } else { + estimate--; + } + + if (estimate < x) { + step += (estimate - x); + estimate = x; + } + + if (sign > 0) { + step = 1; + } + + sign = -1; + } + } + + @Override + public String toString() { + return "Median(v=" + estimate + ")"; + } +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Quantile.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Quantile.java new file mode 100644 index 000000000..459c0bde2 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/stat/Quantile.java @@ -0,0 +1,30 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.stat; + +public interface Quantile { + /** + * + * @return the estimation of the current value of the quantile + */ + public double estimation(); + + /** + * Insert a data point `x` in the quantile estimator. + * @param x + */ + public void insert(double x); +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/util/Clock.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/util/Clock.java new file mode 100644 index 000000000..66c5ca37a --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/util/Clock.java @@ -0,0 +1,33 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client.util; + +import java.util.concurrent.TimeUnit; + +public class Clock { + public static long now() { + return System.nanoTime() / 1000; + } + + public static long elapsedSince(long timestamp) { + long t = now(); + return Math.max(0L, t - timestamp); + } + + public static TimeUnit unit() { + return TimeUnit.MICROSECONDS; + } +} diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/FailureReactiveSocketTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/FailureReactiveSocketTest.java new file mode 100644 index 000000000..56eb01c37 --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/FailureReactiveSocketTest.java @@ -0,0 +1,163 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.client; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketFactory; +import io.reactivesocket.client.filter.FailureAwareFactory; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import rx.RxReactiveStreams; +import rx.observers.TestSubscriber; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.junit.Assert.assertTrue; + +public class FailureReactiveSocketTest { + private Payload dummyPayload = new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }; + + @Test + public void testError() throws InterruptedException { + testReactiveSocket((latch, socket) -> { + assertTrue(1.0 == socket.availability()); + Publisher payloadPublisher = socket.requestResponse(dummyPayload); + + TestSubscriber subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertCompleted(); + double good = socket.availability(); + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertError(RuntimeException.class); + double bad = socket.availability(); + assertTrue(good > bad); + latch.countDown(); + }); + } + + @Test + public void testWidowReset() throws InterruptedException { + testReactiveSocket((latch, socket) -> { + assertTrue(1.0 == socket.availability()); + Publisher payloadPublisher = socket.requestResponse(dummyPayload); + + TestSubscriber subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertCompleted(); + double good = socket.availability(); + + subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertError(RuntimeException.class); + double bad = socket.availability(); + assertTrue(good > bad); + + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + double reset = socket.availability(); + assertTrue(reset > bad); + latch.countDown(); + }); + } + + private void testReactiveSocket(BiConsumer f) throws InterruptedException { + AtomicInteger count = new AtomicInteger(0); + TestingReactiveSocket socket = new TestingReactiveSocket(input -> { + if (count.getAndIncrement() < 1) { + return dummyPayload; + } else { + throw new RuntimeException(); + } + }); + ReactiveSocketFactory factory = new ReactiveSocketFactory() { + @Override + public Publisher apply() { + return subscriber -> { + subscriber.onNext(socket); + subscriber.onComplete(); + }; + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public String remote() { + return "Testing"; + } + }; + + FailureAwareFactory failureFactory = new FailureAwareFactory<>(factory, 100, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + failureFactory.apply().subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(ReactiveSocket socket) { + f.accept(latch, socket); + } + + @Override + public void onError(Throwable t) { + assertTrue(false); + } + + @Override + public void onComplete() {} + }); + + latch.await(30, TimeUnit.SECONDS); + } +} \ No newline at end of file diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java new file mode 100644 index 000000000..b220b9f2e --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java @@ -0,0 +1,157 @@ +package io.reactivesocket.client; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketFactory; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; + +public class LoadBalancerTest { + + private Payload dummy = new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }; + + @Test(timeout = 10_000L) + public void testNeverSelectFailingFactories() throws InterruptedException { + InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000); + InetSocketAddress local1 = InetSocketAddress.createUnresolved("localhost", 7001); + + TestingReactiveSocket socket = new TestingReactiveSocket(Function.identity()); + ReactiveSocketFactory failing = failingFactory(local0); + ReactiveSocketFactory succeeding = succeedingFactory(local1, socket); + List> factories = Arrays.asList(failing, succeeding); + + testBalancer(factories); + } + + @Test(timeout = 10_000L) + public void testNeverSelectFailingSocket() throws InterruptedException { + InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000); + InetSocketAddress local1 = InetSocketAddress.createUnresolved("localhost", 7001); + + TestingReactiveSocket socket = new TestingReactiveSocket(Function.identity()); + TestingReactiveSocket failingSocket = new TestingReactiveSocket(Function.identity()) { + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + subscriber.onError(new RuntimeException("You shouldn't be here")); + } + + public double availability() { + return 0.0; + } + }; + + ReactiveSocketFactory failing = succeedingFactory(local0, failingSocket); + ReactiveSocketFactory succeeding = succeedingFactory(local1, socket); + List> factories = Arrays.asList(failing, succeeding); + + testBalancer(factories); + } + + private void testBalancer(List> factories) throws InterruptedException { + Publisher>> src = s -> { + s.onNext(factories); + s.onComplete(); + }; + + LoadBalancer balancer = new LoadBalancer(src); + + while (balancer.availability() == 0.0) { + Thread.sleep(1); + } + + for (int i = 0; i < 100; i++) { + makeAcall(balancer); + } + } + + private void makeAcall(ReactiveSocket balancer) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + balancer.requestResponse(dummy).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1L); + } + + @Override + public void onNext(Payload payload) { + System.out.println("Successfully receiving a response"); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + Assert.assertTrue(false); + latch.countDown(); + } + + @Override + public void onComplete() { + latch.countDown(); + } + }); + + latch.await(); + } + + private ReactiveSocketFactory succeedingFactory(SocketAddress sa, ReactiveSocket socket) { + return new ReactiveSocketFactory() { + @Override + public Publisher apply() { + return s -> s.onNext(socket); + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public SocketAddress remote() { + return sa; + } + }; + } + + private ReactiveSocketFactory failingFactory(SocketAddress sa) { + return new ReactiveSocketFactory() { + @Override + public Publisher apply() { + Assert.assertTrue(false); + return null; + } + + @Override + public double availability() { + return 0.0; + } + + @Override + public SocketAddress remote() { + return sa; + } + }; + } +} diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java new file mode 100644 index 000000000..6edd1f6fc --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java @@ -0,0 +1,135 @@ +package io.reactivesocket.client; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +public class TestingReactiveSocket implements ReactiveSocket { + private final Function responder; + private final AtomicInteger count; + + public TestingReactiveSocket(Function responder) { + this.responder = responder; + this.count = new AtomicInteger(0); + } + + public int countMessageReceived() { + return count.get(); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> { + subscriber.onSubscribe(EmptySubscription.INSTANCE); + subscriber.onNext(null); + }; + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + subscriber.onSubscribe(new Subscription() { + boolean cancelled = false; + + @Override + public void request(long n) { + if (cancelled) { + return; + } + try { + count.incrementAndGet(); + Payload response = responder.apply(payload); + subscriber.onNext(response); + subscriber.onComplete(); + } catch (Throwable t) { + subscriber.onError(t); + } + } + + @Override + public void cancel() {} + }); + } + + @Override + public Publisher requestStream(Payload payload) { + return requestResponse(payload); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return requestResponse(payload); + } + + @Override + public Publisher requestChannel(Publisher inputs) { + return subscriber -> + inputs.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(Payload input) { + Payload response = responder.apply(input); + subscriber.onNext(response); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + } + + @Override + public Publisher metadataPush(Payload payload) { + return fireAndForget(payload); + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public void start(Completable c) { + c.success(); + } + + @Override + public void onRequestReady(Consumer c) {} + + @Override + public void onRequestReady(Completable c) { + c.success(); + } + + @Override + public void onShutdown(Completable c) {} + + @Override + public void sendLease(int ttl, int numberOfRequests) { + throw new RuntimeException("Not Implemented"); + } + + @Override + public void shutdown() {} + + @Override + public void close() throws Exception {} +} diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java new file mode 100644 index 000000000..1dbf5e342 --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java @@ -0,0 +1,59 @@ +package io.reactivesocket.client; + +import io.reactivesocket.Payload; +import io.reactivesocket.client.exception.TimeoutException; +import io.reactivesocket.client.filter.TimeoutSocket; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +public class TimeoutFactoryTest { + @Test + public void testTimeoutSocket() { + TestingReactiveSocket socket = new TestingReactiveSocket(payload -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return payload; + }); + TimeoutSocket timeout = new TimeoutSocket(socket, 50, TimeUnit.MILLISECONDS); + + timeout.requestResponse(new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Payload payload) { + Assert.assertTrue(false); + } + + @Override + public void onError(Throwable t) { + Assert.assertTrue(t instanceof TimeoutException); + } + + @Override + public void onComplete() { + Assert.assertTrue(false); + } + }); + } +} \ No newline at end of file diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/stat/MedianTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/stat/MedianTest.java new file mode 100644 index 000000000..b0e164000 --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/stat/MedianTest.java @@ -0,0 +1,52 @@ +package io.reactivesocket.client.stat; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Random; + +public class MedianTest { + private double errorSum = 0; + private double maxError = 0; + private double minError = 0; + + @Test + public void testMedian() { + Random rng = new Random("Repeatable tests".hashCode()); + int n = 1; + for (int i = 0; i < n; i++) { + testMedian(rng); + } + System.out.println("Error avg = " + (errorSum/n) + + " in range [" + minError + ", " + maxError + "]"); + } + + /** + * Test Median estimation with normal random data + */ + private void testMedian(Random rng) { + int n = 100 * 1024; + int range = Integer.MAX_VALUE >> 16; + Median m = new Median(); + + int[] data = new int[n]; + for (int i = 0; i < data.length; i++) { + int x = Math.max(0, range/2 + (int) (range/5 * rng.nextGaussian())); + data[i] = x; + m.insert(x); + } + Arrays.sort(data); + + int expected = data[data.length / 2]; + double estimation = m.estimation(); + double error = Math.abs(expected - estimation) / expected; + + errorSum += error; + maxError = Math.max(maxError, error); + minError = Math.min(minError, error); + + Assert.assertTrue("p50=" + estimation + ", real=" + expected + + ", error=" + error, error < 0.02); + } +} \ No newline at end of file diff --git a/reactivesocket-discovery-eureka/build.gradle b/reactivesocket-discovery-eureka/build.gradle new file mode 100644 index 000000000..a11104438 --- /dev/null +++ b/reactivesocket-discovery-eureka/build.gradle @@ -0,0 +1,4 @@ +dependencies { + compile project (':reactivesocket-core') + compile 'com.netflix.eureka:eureka-client:latest.release' +} diff --git a/reactivesocket-discovery-eureka/src/main/java/io/reactivesocket/discovery/eureka/Eureka.java b/reactivesocket-discovery-eureka/src/main/java/io/reactivesocket/discovery/eureka/Eureka.java new file mode 100644 index 000000000..3af22652f --- /dev/null +++ b/reactivesocket-discovery-eureka/src/main/java/io/reactivesocket/discovery/eureka/Eureka.java @@ -0,0 +1,50 @@ +package io.reactivesocket.discovery.eureka; + +import com.netflix.appinfo.InstanceInfo; +import com.netflix.discovery.CacheRefreshedEvent; +import com.netflix.discovery.EurekaClient; +import io.reactivesocket.internal.rx.EmptySubscription; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.stream.Collectors; + +public class Eureka { + private EurekaClient client; + + public Eureka(EurekaClient client) { + this.client = client; + } + + public Publisher> subscribeToAsg(String vip, boolean secure) { + return new Publisher>() { + @Override + public void subscribe(Subscriber> subscriber) { + // TODO: backpressure + subscriber.onSubscribe(EmptySubscription.INSTANCE); + pushChanges(subscriber); + + client.registerEventListener(event -> { + if (event instanceof CacheRefreshedEvent) { + pushChanges(subscriber); + } + }); + } + + private synchronized void pushChanges(Subscriber> subscriber) { + List infos = client.getInstancesByVipAddress(vip, secure); + List socketAddresses = infos.stream() + .map(info -> { + String ip = info.getIPAddr(); + int port = secure ? info.getSecurePort() : info.getPort(); + return InetSocketAddress.createUnresolved(ip, port); + }) + .collect(Collectors.toList()); + subscriber.onNext(socketAddresses); + } + }; + } +} diff --git a/reactivesocket-examples/build.gradle b/reactivesocket-examples/build.gradle new file mode 100644 index 000000000..65cc300f2 --- /dev/null +++ b/reactivesocket-examples/build.gradle @@ -0,0 +1,9 @@ +dependencies { + compile project(':reactivesocket-core') + compile project(':reactivesocket-client') + compile project(':reactivesocket-discovery-eureka') + compile project(':reactivesocket-stats-servo') + compile project(':reactivesocket-transport-tcp') + + compile project(':reactivesocket-test') +} diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java new file mode 100644 index 000000000..f402ab00c --- /dev/null +++ b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java @@ -0,0 +1,60 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.examples; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.client.Builder; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivesocket.util.Unsafe; +import io.reactivesocket.test.TestUtil; +import org.reactivestreams.Publisher; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class EchoClient { + + private static Publisher> source(SocketAddress sa) { + return sub -> sub.onNext(Arrays.asList(sa)); + } + + public static void main(String... args) throws Exception { + InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8888); + ConnectionSetupPayload setupPayload = + ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + + TcpReactiveSocketConnector tcp = + new TcpReactiveSocketConnector(new NioEventLoopGroup(8), setupPayload, System.err::println); + + ReactiveSocket client = Builder.instance() + .withSource(source(address)) + .withConnector(tcp) + .build(); + + Unsafe.awaitAvailability(client); + + Payload request = TestUtil.utf8EncodedPayload("Hello", "META"); + Payload response = Unsafe.blockingSingleWait(client.requestResponse(request), 1, TimeUnit.SECONDS); + + System.out.println(response); + } +} diff --git a/reactivesocket-stats-servo/build.gradle b/reactivesocket-stats-servo/build.gradle new file mode 100644 index 000000000..4f6ef7cd3 --- /dev/null +++ b/reactivesocket-stats-servo/build.gradle @@ -0,0 +1,7 @@ +dependencies { + compile project(':reactivesocket-core') + compile 'com.netflix.servo:servo-core:latest.release' + compile 'org.hdrhistogram:HdrHistogram:latest.release' + + testCompile project(':reactivesocket-test') +} diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java new file mode 100644 index 000000000..6257c3dad --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java @@ -0,0 +1,115 @@ +package io.reactivesocket.loadbalancer.servo; + +import com.google.common.util.concurrent.AtomicDouble; +import com.netflix.servo.DefaultMonitorRegistry; +import com.netflix.servo.monitor.DoubleGauge; +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.tag.TagList; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; + +import java.util.function.Consumer; + +/** + * ReactiveSocket that delegates all calls to child reactice socket, and records the current availability as a servo metric + */ +public class AvailabilityMetricReactiveSocket implements ReactiveSocket { + private final ReactiveSocket child; + + private final DoubleGauge availabilityGauge; + + private final AtomicDouble atomicDouble; + + public AvailabilityMetricReactiveSocket(ReactiveSocket child, String name, TagList tags) { + this.child = child; + MonitorConfig.Builder builder = MonitorConfig.builder(name); + + if (tags != null) { + builder.withTags(tags); + } + MonitorConfig config = builder.build(); + availabilityGauge = new DoubleGauge(config); + DefaultMonitorRegistry.getInstance().register(availabilityGauge); + atomicDouble = availabilityGauge.getNumber(); + } + + + @Override + public Publisher requestResponse(Payload payload) { + return child.requestResponse(payload); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return child.fireAndForget(payload); + } + + @Override + public Publisher requestStream(Payload payload) { + return child.requestStream(payload); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return child.requestSubscription(payload); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return child.requestChannel(payloads); + } + + @Override + public Publisher metadataPush(Payload payload) { + return child.metadataPush(payload); + } + + @Override + public double availability() { + double availability = child.availability(); + atomicDouble.set(availability); + return availability; + } + + @Override + public void start(Completable c) { + child.start(c); + } + + @Override + public void startAndWait() { + child.startAndWait(); + } + + @Override + public void onRequestReady(Consumer c) { + child.onRequestReady(c); + } + + @Override + public void onRequestReady(Completable c) { + child.onRequestReady(c); + } + + @Override + public void sendLease(int ttl, int numberOfRequests) { + child.sendLease(ttl, numberOfRequests); + } + + @Override + public void shutdown() { + child.shutdown(); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public void onShutdown(Completable c) { + child.onShutdown(c); + } +} diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocket.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocket.java new file mode 100644 index 000000000..c7593e57e --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocket.java @@ -0,0 +1,149 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.loadbalancer.servo; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.loadbalancer.servo.internal.HdrHistogramServoTimer; +import io.reactivesocket.loadbalancer.servo.internal.ThreadLocalAdderCounter; +import io.reactivesocket.util.ReactiveSocketProxy; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * An implementation of {@link ReactiveSocket} that sends metrics to Servo + */ +public class ServoMetricsReactiveSocket extends ReactiveSocketProxy { + final ThreadLocalAdderCounter success; + final ThreadLocalAdderCounter failure; + final HdrHistogramServoTimer timer; + + private class RecordingSubscriber implements Subscriber { + private final Subscriber child; + private long start; + + RecordingSubscriber(Subscriber child) { + this.child = child; + } + + @Override + public void onSubscribe(Subscription s) { + child.onSubscribe(s); + start = recordStart(); + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable t) { + child.onError(t); + recordFailure(start); + } + + @Override + public void onComplete() { + child.onComplete(); + recordSuccess(start); + } + } + + public ServoMetricsReactiveSocket(ReactiveSocket child, String prefix) { + super(child); + this.success = ThreadLocalAdderCounter.newThreadLocalAdderCounter(prefix + "_success"); + this.failure = ThreadLocalAdderCounter.newThreadLocalAdderCounter(prefix + "_failure"); + this.timer = HdrHistogramServoTimer.newInstance(prefix + "_timer"); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + child.requestResponse(payload).subscribe(new RecordingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> + child.requestStream(payload).subscribe(new RecordingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> + child.requestSubscription(payload).subscribe(new RecordingSubscriber<>(subscriber)); + + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> + child.fireAndForget(payload).subscribe(new RecordingSubscriber<>(subscriber)); + + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> + child.metadataPush(payload).subscribe(new RecordingSubscriber<>(subscriber)); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> + child.requestChannel(payloads).subscribe(new RecordingSubscriber<>(subscriber)); + } + + public String histrogramToString() { + long successCount = success.get(); + long failureCount = failure.get(); + long totalCount = successCount + failureCount; + + StringBuilder s = new StringBuilder(); + s.append(String.format("%-12s%-12s\n","Percentile","Latency")); + s.append(String.format("=========================\n")); + s.append(String.format("%-12s%dms\n","50%",NANOSECONDS.toMillis(timer.getP50()))); + s.append(String.format("%-12s%dms\n","90%",NANOSECONDS.toMillis(timer.getP90()))); + s.append(String.format("%-12s%dms\n","99%",NANOSECONDS.toMillis(timer.getP99()))); + s.append(String.format("%-12s%dms\n","99.9%",NANOSECONDS.toMillis(timer.getP99_9()))); + s.append(String.format("%-12s%dms\n","99.99%",NANOSECONDS.toMillis(timer.getP99_99()))); + s.append(String.format("-------------------------\n")); + s.append(String.format("%-12s%dms\n","min",NANOSECONDS.toMillis(timer.getMin()))); + s.append(String.format("%-12s%dms\n","max",NANOSECONDS.toMillis(timer.getMax()))); + s.append(String.format("%-12s%d (%.0f%%)\n","success",successCount,100.0*successCount/totalCount)); + s.append(String.format("%-12s%d (%.0f%%)\n","failure",failureCount,100.0*failureCount/totalCount)); + s.append(String.format("%-12s%d\n","count",totalCount)); + return s.toString(); + } + + private long recordStart() { + return System.nanoTime(); + } + + private void recordFailure(long start) { + failure.increment(); + timer.record(System.nanoTime() - start); + } + + private void recordSuccess(long start) { + success.increment(); + timer.record(System.nanoTime() - start); + } +} diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramGauge.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramGauge.java new file mode 100644 index 000000000..34e8f2e39 --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramGauge.java @@ -0,0 +1,27 @@ +package io.reactivesocket.loadbalancer.servo.internal; + +import com.netflix.servo.DefaultMonitorRegistry; +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.monitor.NumberGauge; +import org.HdrHistogram.Histogram; + +/** + * Gauge that wraps a {@link Histogram} and when it's polled returns a particular percentage + */ +public class HdrHistogramGauge extends NumberGauge { + private final Histogram histogram; + private final double percentile; + + public HdrHistogramGauge(MonitorConfig monitorConfig, Histogram histogram, double percentile) { + super(monitorConfig); + this.histogram = histogram; + this.percentile = percentile; + + DefaultMonitorRegistry.getInstance().register(this); + } + + @Override + public Long getValue() { + return histogram.getValueAtPercentile(percentile); + } +} diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMaxGauge.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMaxGauge.java new file mode 100644 index 000000000..a7492e52e --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMaxGauge.java @@ -0,0 +1,25 @@ +package io.reactivesocket.loadbalancer.servo.internal; + +import com.netflix.servo.DefaultMonitorRegistry; +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.monitor.NumberGauge; +import org.HdrHistogram.Histogram; + +/** + * Gauge that wraps a {@link Histogram} and when its polled returns it's max + */ +public class HdrHistogramMaxGauge extends NumberGauge { + private final Histogram histogram; + + public HdrHistogramMaxGauge(MonitorConfig monitorConfig, Histogram histogram) { + super(monitorConfig); + this.histogram = histogram; + + DefaultMonitorRegistry.getInstance().register(this); + } + + @Override + public Long getValue() { + return histogram.getMaxValue(); + } +} diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMinGauge.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMinGauge.java new file mode 100644 index 000000000..2c6cb147a --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramMinGauge.java @@ -0,0 +1,25 @@ +package io.reactivesocket.loadbalancer.servo.internal; + +import com.netflix.servo.DefaultMonitorRegistry; +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.monitor.NumberGauge; +import org.HdrHistogram.Histogram; + +/** + * Gauge that wraps a {@link Histogram} and when its polled returns it's min + */ +public class HdrHistogramMinGauge extends NumberGauge { + private final Histogram histogram; + + public HdrHistogramMinGauge(MonitorConfig monitorConfig, Histogram histogram) { + super(monitorConfig); + this.histogram = histogram; + + DefaultMonitorRegistry.getInstance().register(this); + } + + @Override + public Long getValue() { + return histogram.getMinValue(); + } +} diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramServoTimer.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramServoTimer.java new file mode 100644 index 000000000..e26b353e1 --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/HdrHistogramServoTimer.java @@ -0,0 +1,123 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.loadbalancer.servo.internal; + +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.tag.Tag; +import org.HdrHistogram.ConcurrentHistogram; +import org.HdrHistogram.Histogram; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Captures a HdrHistogram and sends it to pre-defined Server Counters. + * The buckets are min, max, 50%, 90%, 99%, 99.9%, and 99.99% + */ +public class HdrHistogramServoTimer { + private final Histogram histogram = new ConcurrentHistogram(TimeUnit.MINUTES.toNanos(1), 2); + + private HdrHistogramMinGauge min; + + private HdrHistogramMaxGauge max; + + private HdrHistogramGauge p50; + + private HdrHistogramGauge p90; + + private HdrHistogramGauge p99; + + private HdrHistogramGauge p99_9; + + private HdrHistogramGauge p99_99; + + private HdrHistogramServoTimer(String label) { + histogram.setAutoResize(true); + + min = new HdrHistogramMinGauge(MonitorConfig.builder(label + "_min").build(), histogram); + max = new HdrHistogramMaxGauge(MonitorConfig.builder(label + "_max").build(), histogram); + + p50 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p50").build(), histogram, 50); + p90 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p90").build(), histogram, 90); + p99 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p99").build(), histogram, 99); + p99_9 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p99_9").build(), histogram, 99.9); + p99_99 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p99_99").build(), histogram, 99.99); + } + + + private HdrHistogramServoTimer(String label, List tags) { + histogram.setAutoResize(true); + + + min = new HdrHistogramMinGauge(MonitorConfig.builder(label + "_min").withTags(tags).build(), histogram); + max = new HdrHistogramMaxGauge(MonitorConfig.builder(label + "_max").withTags(tags).build(), histogram); + + p50 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p50").withTags(tags).build(), histogram, 50); + p90 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p90").withTags(tags).build(), histogram, 90); + p99 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p99").withTags(tags).build(), histogram, 99); + p99_9 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p99_9").withTags(tags).build(), histogram, 99.9); + p99_99 = new HdrHistogramGauge(MonitorConfig.builder(label + "_p99_99").withTags(tags).build(), histogram, 99.99); + } + + public static HdrHistogramServoTimer newInstance(String label) { + return new HdrHistogramServoTimer(label); + } + + public static HdrHistogramServoTimer newInstance(String label, Tag... tags) { + return newInstance(label, Arrays.asList(tags)); + } + + public static HdrHistogramServoTimer newInstance(String label, List tags) { + return new HdrHistogramServoTimer(label, tags); + } + + /** + * Records a value for to the histogram and updates the Servo counter buckets + * @param value the value to update + */ + public void record(long value) { + histogram.recordValue(value); + } + + public Long getMin() { + return min.getValue(); + } + + public Long getMax() { + return max.getValue(); + } + + public Long getP50() { + return p50.getValue(); + } + + public Long getP90() { + return p90.getValue(); + } + + public Long getP99() { + return p99.getValue(); + } + + public Long getP99_9() { + return p99_9.getValue(); + } + + public Long getP99_99() { + return p99_99.getValue(); + } +} \ No newline at end of file diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdder.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdder.java new file mode 100644 index 000000000..c789e7ce2 --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdder.java @@ -0,0 +1,105 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.loadbalancer.servo.internal; + +import org.agrona.UnsafeAccess; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Fast adder based on http://psy-lob-saw.blogspot.com/2013/06/java-concurrent-counters-by-numbers.html + */ +public class ThreadLocalAdder { + private final AtomicLong deadThreadSum = new AtomicLong(); + + static class PaddedLong1 { + long p1, p2, p3, p4, p6, p7; + } + + static class PaddedLong2 extends PaddedLong1 { + private static final long VALUE_OFFSET; + + static { + try { + VALUE_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(PaddedLong2.class.getDeclaredField("value")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + volatile long value; + + public long get() { + return value; + } + + public long plainGet() { + return UnsafeAccess.UNSAFE.getLong(this, VALUE_OFFSET); + } + + public void lazySet(long v) { + UnsafeAccess.UNSAFE.putOrderedLong(this, VALUE_OFFSET, v); + } + + } + + static class PaddedLong3 extends PaddedLong2 { + long p9, p10, p11, p12, p13, p14; + } + + final class ThreadAtomicLong extends PaddedLong3 { + final Thread t = Thread.currentThread(); + + public ThreadAtomicLong() { + counters.add(this); + counters + .forEach(tal -> { + if (!tal.t.isAlive()) { + deadThreadSum.addAndGet(tal.get()); + counters.remove(tal); + } + }); + } + } + + private final CopyOnWriteArrayList counters = new CopyOnWriteArrayList<>(); + + private final ThreadLocal threadLocalAtomicLong = ThreadLocal.withInitial(ThreadAtomicLong::new); + + public void increment() { + increment(1); + } + + public void increment(long amount) { + ThreadAtomicLong lc = threadLocalAtomicLong.get(); + lc.lazySet(lc.plainGet() + amount); + } + + public long get() { + long currentDeadThreadSum; + long sum; + do { + currentDeadThreadSum = deadThreadSum.get(); + sum = 0; + for (ThreadAtomicLong threadAtomicLong : counters) { + sum += threadAtomicLong.get(); + } + } while (currentDeadThreadSum != deadThreadSum.get()); + return sum + currentDeadThreadSum; + } + +} \ No newline at end of file diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdderCounter.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdderCounter.java new file mode 100644 index 000000000..bb8c95c0f --- /dev/null +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/internal/ThreadLocalAdderCounter.java @@ -0,0 +1,113 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.loadbalancer.servo.internal; + +import com.netflix.servo.DefaultMonitorRegistry; +import com.netflix.servo.annotations.DataSourceType; +import com.netflix.servo.monitor.AbstractMonitor; +import com.netflix.servo.monitor.Counter; +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.tag.Tag; + +import java.util.List; + +/** + * A {@link Counter} implementation that uses {@link ThreadLocalAdderCounter} + */ +public class ThreadLocalAdderCounter extends AbstractMonitor implements Counter { + private ThreadLocalAdder adder = new ThreadLocalAdder(); + + public static ThreadLocalAdderCounter newThreadLocalAdderCounter(String name) { + MonitorConfig.Builder builder = MonitorConfig.builder(name); + MonitorConfig config = builder.build(); + + ThreadLocalAdderCounter threadLocalAdderCounter = new ThreadLocalAdderCounter(config); + DefaultMonitorRegistry.getInstance().register(threadLocalAdderCounter); + + return threadLocalAdderCounter; + } + + public static ThreadLocalAdderCounter newThreadLocalAdderCounter(String name, List tags) { + MonitorConfig.Builder builder = MonitorConfig.builder(name); + builder.withTags(tags); + MonitorConfig config = builder.build(); + + ThreadLocalAdderCounter threadLocalAdderCounter = new ThreadLocalAdderCounter(config); + DefaultMonitorRegistry.getInstance().register(threadLocalAdderCounter); + + return threadLocalAdderCounter; + } + + + /** + * Creates a new instance of the counter. + */ + public ThreadLocalAdderCounter(MonitorConfig config) { + super(config.withAdditionalTag(DataSourceType.COUNTER)); + } + + /** + * {@inheritDoc} + */ + @Override + public void increment() { + adder.increment(); + } + + /** + * {@inheritDoc} + */ + @Override + public void increment(long amount) { + adder.increment(amount); + } + + /** + * {@inheritDoc} + */ + @Override + public Number getValue(int pollerIdx) { + return adder.get(); + } + + public long get() { + return adder.get(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof ThreadLocalAdderCounter)) { + return false; + } + ThreadLocalAdderCounter m = (ThreadLocalAdderCounter) obj; + return config.equals(m.getConfig()) && adder.get() == m.adder.get(); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + int result = config.hashCode(); + long n = adder.get(); + result = 31 * result + (int) (n ^ (n >>> 32)); + return result; + } + +} \ No newline at end of file diff --git a/reactivesocket-stats-servo/src/test/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocketTest.java b/reactivesocket-stats-servo/src/test/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocketTest.java new file mode 100644 index 000000000..3c6b2de94 --- /dev/null +++ b/reactivesocket-stats-servo/src/test/java/io/reactivesocket/loadbalancer/servo/ServoMetricsReactiveSocketTest.java @@ -0,0 +1,307 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.loadbalancer.servo; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.rx.Completable; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import rx.RxReactiveStreams; +import rx.observers.TestSubscriber; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; + +/** + * Created by rroeser on 3/7/16. + */ +public class ServoMetricsReactiveSocketTest { + @Test + public void testCountSuccess() { + ServoMetricsReactiveSocket client = new ServoMetricsReactiveSocket(new ReactiveSocket() { + @Override + public Publisher metadataPush(Payload payload) { + return null; + } + + @Override + public Publisher fireAndForget(Payload payload) { + return null; + } + + @Override + public Publisher requestSubscription(Payload payload) { + return null; + } + + @Override + public Publisher requestStream(Payload payload) { + return null; + } + + @Override + public Publisher requestResponse(Payload payload) { + return s -> { + s.onNext(new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }); + + s.onComplete(); + }; + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return null; + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public void close() throws Exception {} + @Override + public void start(Completable completable) {} + @Override + public void onRequestReady(Consumer consumer) {} + @Override + public void onRequestReady(Completable completable) {} + @Override + public void onShutdown(Completable completable) {} + @Override + public void sendLease(int i, int i1) {} + @Override + public void shutdown() {} + }, "test"); + + Publisher payloadPublisher = client.requestResponse(new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }); + + TestSubscriber subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertNoErrors(); + + Assert.assertEquals(1, client.success.get()); + } + + @Test + public void testCountFailure() { + ServoMetricsReactiveSocket client = new ServoMetricsReactiveSocket(new ReactiveSocket() { + @Override + public Publisher metadataPush(Payload payload) { + return null; + } + + @Override + public Publisher fireAndForget(Payload payload) { + return null; + } + + @Override + public Publisher requestSubscription(Payload payload) { + return null; + } + + @Override + public Publisher requestStream(Payload payload) { + return null; + } + + @Override + public Publisher requestResponse(Payload payload) { + return new Publisher() { + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(EmptySubscription.INSTANCE); + s.onError(new RuntimeException()); + } + }; + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return null; + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public void close() throws Exception {} + @Override + public void start(Completable completable) {} + @Override + public void onRequestReady(Consumer consumer) {} + @Override + public void onRequestReady(Completable completable) {} + @Override + public void onShutdown(Completable completable) {} + @Override + public void sendLease(int i, int i1) {} + @Override + public void shutdown() {} + }, "test"); + + Publisher payloadPublisher = client.requestResponse(new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }); + + TestSubscriber subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertError(RuntimeException.class); + + Assert.assertEquals(1, client.failure.get()); + + } + + @Test + public void testHistogram() throws Exception { + ServoMetricsReactiveSocket client = new ServoMetricsReactiveSocket(new ReactiveSocket() { + @Override + public Publisher metadataPush(Payload payload) { + return null; + } + + @Override + public Publisher fireAndForget(Payload payload) { + return null; + } + + @Override + public Publisher requestSubscription(Payload payload) { + return null; + } + + @Override + public Publisher requestStream(Payload payload) { + return null; + } + + @Override + public Publisher requestResponse(Payload payload) { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(10, 50)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return s -> { + s.onSubscribe(EmptySubscription.INSTANCE); + s.onNext(new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }); + + s.onComplete(); + }; + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return null; + } + + @Override + public double availability() { + return 1.0; + } + + @Override + public void close() throws Exception {} + @Override + public void start(Completable completable) {} + @Override + public void onRequestReady(Consumer consumer) {} + @Override + public void onRequestReady(Completable completable) {} + @Override + public void onShutdown(Completable completable) {} + @Override + public void sendLease(int i, int i1) {} + @Override + public void shutdown() {} + }, "test"); + + for (int i = 0; i < 10; i ++) { + Publisher payloadPublisher = client.requestResponse(new Payload() { + @Override + public ByteBuffer getData() { + return null; + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }); + + TestSubscriber subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(payloadPublisher).subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertNoErrors(); + } + + Thread.sleep(3_000); + + System.out.println(client.histrogramToString()); + + Assert.assertEquals(10, client.success.get()); + Assert.assertEquals(0, client.failure.get()); + Assert.assertNotEquals(client.timer.getMax(), client.timer.getMin()); + } +} diff --git a/settings.gradle b/settings.gradle index 2d5fafebf..14e459afc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,10 @@ rootProject.name='reactivesocket' +include 'reactivesocket-client' include 'reactivesocket-core' +include 'reactivesocket-discovery-eureka' +include 'reactivesocket-examples' include 'reactivesocket-mime-types' +include 'reactivesocket-stats-servo' include 'reactivesocket-test' include 'reactivesocket-transport-aeron' include 'reactivesocket-transport-local'