diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Ewma.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/Ewma.java similarity index 97% rename from rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Ewma.java rename to rsocket-core/src/main/java/io/rsocket/loadbalance/Ewma.java index efdc20da0..4812114dd 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Ewma.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/Ewma.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.rsocket.loadbalance.stat; +package io.rsocket.loadbalance; import io.rsocket.util.Clock; import java.util.concurrent.TimeUnit; @@ -27,7 +27,7 @@ *

e.g. with a half-life of 10 unit, if you insert 100 at t=0 and 200 at t=10 the ewma will be * equal to (200 - 100)/2 = 150 (half of the distance between the new and the old value) */ -public class Ewma { +class Ewma { private final long tau; private volatile long stamp; private volatile double ewma; diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/FrugalQuantile.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/FrugalQuantile.java similarity index 96% rename from rsocket-core/src/main/java/io/rsocket/loadbalance/stat/FrugalQuantile.java rename to rsocket-core/src/main/java/io/rsocket/loadbalance/FrugalQuantile.java index 7dcebb424..efa32ff83 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/FrugalQuantile.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/FrugalQuantile.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.rsocket.loadbalance.stat; +package io.rsocket.loadbalance; import java.util.SplittableRandom; @@ -25,7 +25,7 @@ * *

More info: http://blog.aggregateknowledge.com/2013/09/16/sketch-of-the-day-frugal-streaming/ */ -public class FrugalQuantile implements Quantile { +class FrugalQuantile implements Quantile { private final double increment; volatile double estimate; int step; diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java index e865afb29..4b19550bd 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java @@ -79,8 +79,38 @@ public static LoadbalanceRSocketClient create( new RSocketPool(rSocketSuppliersPublisher, loadbalanceStrategy)); } - public static RSocketClient create( + public static LoadbalanceRSocketClient create( Publisher> rSocketSuppliersPublisher) { return create(new RoundRobinLoadbalanceStrategy(), rSocketSuppliersPublisher); } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + LoadbalanceStrategy loadbalanceStrategy; + + Builder() {} + + public Builder withWeightedLoadbalanceStrategy() { + return withCustomLoadbalanceStrategy(new WeightedLoadbalanceStrategy()); + } + + public Builder withRoundRobinLoadbalanceStrategy() { + return withCustomLoadbalanceStrategy(new RoundRobinLoadbalanceStrategy()); + } + + public Builder withCustomLoadbalanceStrategy(LoadbalanceStrategy strategy) { + this.loadbalanceStrategy = strategy; + return this; + } + + public LoadbalanceRSocketClient build( + Publisher> rSocketSuppliersPublisher) { + return new LoadbalanceRSocketClient( + new RSocketPool(rSocketSuppliersPublisher, this.loadbalanceStrategy)); + } + } } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java index 2bcf4455b..2a333959b 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java @@ -15,15 +15,11 @@ */ package io.rsocket.loadbalance; +import io.rsocket.RSocket; import java.util.List; -import java.util.function.Supplier; @FunctionalInterface public interface LoadbalanceStrategy { - WeightedRSocket select(List availableRSockets); - - default Supplier statsSupplier() { - return Stats::noOps; - } + RSocket select(List availableRSockets); } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Median.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/Median.java similarity index 95% rename from rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Median.java rename to rsocket-core/src/main/java/io/rsocket/loadbalance/Median.java index 5d7c7d034..833bd5380 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Median.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/Median.java @@ -14,10 +14,10 @@ * limitations under the License. */ -package io.rsocket.loadbalance.stat; +package io.rsocket.loadbalance; /** This implementation gives better results because it considers more data-point. */ -public class Median extends FrugalQuantile { +class Median extends FrugalQuantile { public Median() { super(0.5, 1.0); } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Quantile.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/Quantile.java similarity index 92% rename from rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Quantile.java rename to rsocket-core/src/main/java/io/rsocket/loadbalance/Quantile.java index bfaad9e62..84c699197 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Quantile.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/Quantile.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.rsocket.loadbalance.stat; +package io.rsocket.loadbalance; -public interface Quantile { +interface Quantile { /** @return the estimation of the current value of the quantile */ double estimation(); diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java index 35a38f3b4..dc776852c 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java @@ -37,7 +37,7 @@ import reactor.util.annotation.Nullable; class RSocketPool extends ResolvingOperator - implements CoreSubscriber>, List { + implements CoreSubscriber>, List { final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this); final LoadbalanceStrategy loadbalanceStrategy; @@ -59,7 +59,11 @@ class RSocketPool extends ResolvingOperator RSocketPool( Publisher> source, LoadbalanceStrategy loadbalanceStrategy) { this.loadbalanceStrategy = loadbalanceStrategy; - this.statsSupplier = loadbalanceStrategy.statsSupplier(); + if (loadbalanceStrategy instanceof WeightedLoadbalanceStrategy) { + this.statsSupplier = Stats::create; + } else { + this.statsSupplier = Stats::noOps; + } ACTIVE_SOCKETS.lazySet(this, EMPTY); @@ -361,12 +365,12 @@ public boolean contains(Object o) { } @Override - public Iterator iterator() { + public Iterator iterator() { throw new UnsupportedOperationException(); } @Override - public boolean add(WeightedRSocket weightedRSocket) { + public boolean add(RSocket weightedRSocket) { throw new UnsupportedOperationException(); } @@ -381,12 +385,12 @@ public boolean containsAll(Collection c) { } @Override - public boolean addAll(Collection c) { + public boolean addAll(Collection c) { throw new UnsupportedOperationException(); } @Override - public boolean addAll(int index, Collection c) { + public boolean addAll(int index, Collection c) { throw new UnsupportedOperationException(); } @@ -406,12 +410,12 @@ public void clear() { } @Override - public WeightedRSocket set(int index, WeightedRSocket element) { + public WeightedRSocket set(int index, RSocket element) { throw new UnsupportedOperationException(); } @Override - public void add(int index, WeightedRSocket element) { + public void add(int index, RSocket element) { throw new UnsupportedOperationException(); } @@ -431,17 +435,17 @@ public int lastIndexOf(Object o) { } @Override - public ListIterator listIterator() { + public ListIterator listIterator() { throw new UnsupportedOperationException(); } @Override - public ListIterator listIterator(int index) { + public ListIterator listIterator(int index) { throw new UnsupportedOperationException(); } @Override - public List subList(int fromIndex, int toIndex) { + public List subList(int fromIndex, int toIndex) { throw new UnsupportedOperationException(); } } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java index 60227f9ac..0e1c541f2 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java @@ -15,10 +15,11 @@ */ package io.rsocket.loadbalance; +import io.rsocket.RSocket; import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy { +class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy { volatile int nextIndex; @@ -26,7 +27,7 @@ public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy { AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadbalanceStrategy.class, "nextIndex"); @Override - public WeightedRSocket select(List sockets) { + public RSocket select(List sockets) { int length = sockets.size(); int indexToUse = Math.abs(NEXT_INDEX.getAndIncrement(this) % length); diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/Stats.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/Stats.java index 12f1d1c3e..2e9828938 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/Stats.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/Stats.java @@ -1,15 +1,11 @@ package io.rsocket.loadbalance; import io.rsocket.Availability; -import io.rsocket.loadbalance.stat.Ewma; -import io.rsocket.loadbalance.stat.FrugalQuantile; -import io.rsocket.loadbalance.stat.Median; -import io.rsocket.loadbalance.stat.Quantile; import io.rsocket.util.Clock; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -public class Stats implements Availability { +class Stats implements Availability { private static final double DEFAULT_LOWER_QUANTILE = 0.5; private static final double DEFAULT_HIGHER_QUANTILE = 0.8; diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java index 590da3ded..44ffbd6f5 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java @@ -16,13 +16,14 @@ package io.rsocket.loadbalance; +import io.rsocket.RSocket; import java.util.List; import java.util.SplittableRandom; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; import reactor.util.annotation.Nullable; -public class WeightedLoadbalanceStrategy implements LoadbalanceStrategy { +class WeightedLoadbalanceStrategy implements LoadbalanceStrategy { private static final double EXP_FACTOR = 4.0; @@ -52,24 +53,19 @@ public WeightedLoadbalanceStrategy( } @Override - public Supplier statsSupplier() { - return this.statsSupplier; - } - - @Override - public WeightedRSocket select(List sockets) { + public RSocket select(List sockets) { final int effort = this.effort; final int size = sockets.size(); WeightedRSocket weightedRSocket; switch (size) { case 1: - weightedRSocket = sockets.get(0); + weightedRSocket = (WeightedRSocket) sockets.get(0); break; case 2: { - WeightedRSocket rsc1 = sockets.get(0); - WeightedRSocket rsc2 = sockets.get(1); + WeightedRSocket rsc1 = (WeightedRSocket) sockets.get(0); + WeightedRSocket rsc2 = (WeightedRSocket) sockets.get(1); double w1 = algorithmicWeight(rsc1); double w2 = algorithmicWeight(rsc2); @@ -92,8 +88,8 @@ public WeightedRSocket select(List sockets) { if (i2 >= i1) { i2++; } - rsc1 = sockets.get(i1); - rsc2 = sockets.get(i2); + rsc1 = (WeightedRSocket) sockets.get(i1); + rsc2 = (WeightedRSocket) sockets.get(i2); if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) { break; } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java index de9e56fa4..488a7134d 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedRSocket.java @@ -17,7 +17,7 @@ import io.rsocket.RSocket; -public interface WeightedRSocket extends RSocket { +interface WeightedRSocket extends RSocket { Stats stats(); } diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/package-info.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/package-info.java deleted file mode 100644 index 20c2c9d73..000000000 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/stat/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@NonNullApi -package io.rsocket.loadbalance.stat; - -import reactor.util.annotation.NonNullApi; diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java index 7a55c8274..2c3fd831d 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/loadbalancer/RoundRobinRSocketLoadbalancerExample.java @@ -6,7 +6,6 @@ import io.rsocket.core.RSocketServer; import io.rsocket.loadbalance.LoadbalanceRSocketClient; import io.rsocket.loadbalance.LoadbalanceRSocketSource; -import io.rsocket.loadbalance.RoundRobinLoadbalanceStrategy; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -120,7 +119,7 @@ public static void main(String[] args) { }); RSocketClient loadBalancedRSocketClient = - LoadbalanceRSocketClient.create(new RoundRobinLoadbalanceStrategy(), producer); + LoadbalanceRSocketClient.builder().withRoundRobinLoadbalanceStrategy().build(producer); for (int i = 0; i < 10000; i++) { try { diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index 9b544fb24..55b40fe39 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -22,7 +22,6 @@ import io.rsocket.client.filter.RSocketSupplier; import io.rsocket.loadbalance.LoadbalanceRSocketClient; import io.rsocket.loadbalance.LoadbalanceRSocketSource; -import io.rsocket.loadbalance.WeightedLoadbalanceStrategy; import java.util.Collection; import java.util.stream.Collectors; import org.reactivestreams.Publisher; @@ -79,7 +78,9 @@ public static LoadBalancedRSocketMono create( rsl.stream() .map(rs -> LoadbalanceRSocketSource.from(rs.toString(), rs.get())) .collect(Collectors.toList())) - .as(f -> LoadbalanceRSocketClient.create(new WeightedLoadbalanceStrategy(), f))); + .as( + f -> + LoadbalanceRSocketClient.builder().withWeightedLoadbalanceStrategy().build(f))); } public static LoadBalancedRSocketMono fromClient(LoadbalanceRSocketClient rSocketClient) {