Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.rsocket.loadbalance.stat;
package io.rsocket.loadbalance;

import io.rsocket.util.Clock;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,7 +27,7 @@
* <p>e.g. with a half-life of 10 unit, if you insert 100 at t=0 and 200 at t=10 the ewma will be
* equal to (200 - 100)/2 = 150 (half of the distance between the new and the old value)
*/
public class Ewma {
class Ewma {
private final long tau;
private volatile long stamp;
private volatile double ewma;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.rsocket.loadbalance.stat;
package io.rsocket.loadbalance;

import java.util.SplittableRandom;

Expand All @@ -25,7 +25,7 @@
*
* <p>More info: http://blog.aggregateknowledge.com/2013/09/16/sketch-of-the-day-frugal-streaming/
*/
public class FrugalQuantile implements Quantile {
class FrugalQuantile implements Quantile {
private final double increment;
volatile double estimate;
int step;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,38 @@ public static LoadbalanceRSocketClient create(
new RSocketPool(rSocketSuppliersPublisher, loadbalanceStrategy));
}

public static RSocketClient create(
public static LoadbalanceRSocketClient create(
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
return create(new RoundRobinLoadbalanceStrategy(), rSocketSuppliersPublisher);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

LoadbalanceStrategy loadbalanceStrategy;

Builder() {}

public Builder withWeightedLoadbalanceStrategy() {
return withCustomLoadbalanceStrategy(new WeightedLoadbalanceStrategy());
}

public Builder withRoundRobinLoadbalanceStrategy() {
return withCustomLoadbalanceStrategy(new RoundRobinLoadbalanceStrategy());
}

public Builder withCustomLoadbalanceStrategy(LoadbalanceStrategy strategy) {
this.loadbalanceStrategy = strategy;
return this;
}

public LoadbalanceRSocketClient build(
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
return new LoadbalanceRSocketClient(
new RSocketPool(rSocketSuppliersPublisher, this.loadbalanceStrategy));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
*/
package io.rsocket.loadbalance;

import io.rsocket.RSocket;
import java.util.List;
import java.util.function.Supplier;

@FunctionalInterface
public interface LoadbalanceStrategy {

WeightedRSocket select(List<WeightedRSocket> availableRSockets);

default Supplier<Stats> statsSupplier() {
return Stats::noOps;
}
RSocket select(List<RSocket> availableRSockets);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package io.rsocket.loadbalance.stat;
package io.rsocket.loadbalance;

/** This implementation gives better results because it considers more data-point. */
public class Median extends FrugalQuantile {
class Median extends FrugalQuantile {
public Median() {
super(0.5, 1.0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.loadbalance.stat;
package io.rsocket.loadbalance;

public interface Quantile {
interface Quantile {
/** @return the estimation of the current value of the quantile */
double estimation();

Expand Down
26 changes: 15 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import reactor.util.annotation.Nullable;

class RSocketPool extends ResolvingOperator<Void>
implements CoreSubscriber<List<LoadbalanceRSocketSource>>, List<WeightedRSocket> {
implements CoreSubscriber<List<LoadbalanceRSocketSource>>, List<RSocket> {

final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
final LoadbalanceStrategy loadbalanceStrategy;
Expand All @@ -59,7 +59,11 @@ class RSocketPool extends ResolvingOperator<Void>
RSocketPool(
Publisher<List<LoadbalanceRSocketSource>> source, LoadbalanceStrategy loadbalanceStrategy) {
this.loadbalanceStrategy = loadbalanceStrategy;
this.statsSupplier = loadbalanceStrategy.statsSupplier();
if (loadbalanceStrategy instanceof WeightedLoadbalanceStrategy) {
this.statsSupplier = Stats::create;
} else {
this.statsSupplier = Stats::noOps;
}

ACTIVE_SOCKETS.lazySet(this, EMPTY);

Expand Down Expand Up @@ -361,12 +365,12 @@ public boolean contains(Object o) {
}

@Override
public Iterator<WeightedRSocket> iterator() {
public Iterator<RSocket> iterator() {
throw new UnsupportedOperationException();
}

@Override
public boolean add(WeightedRSocket weightedRSocket) {
public boolean add(RSocket weightedRSocket) {
throw new UnsupportedOperationException();
}

Expand All @@ -381,12 +385,12 @@ public boolean containsAll(Collection<?> c) {
}

@Override
public boolean addAll(Collection<? extends WeightedRSocket> c) {
public boolean addAll(Collection<? extends RSocket> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(int index, Collection<? extends WeightedRSocket> c) {
public boolean addAll(int index, Collection<? extends RSocket> c) {
throw new UnsupportedOperationException();
}

Expand All @@ -406,12 +410,12 @@ public void clear() {
}

@Override
public WeightedRSocket set(int index, WeightedRSocket element) {
public WeightedRSocket set(int index, RSocket element) {
throw new UnsupportedOperationException();
}

@Override
public void add(int index, WeightedRSocket element) {
public void add(int index, RSocket element) {
throw new UnsupportedOperationException();
}

Expand All @@ -431,17 +435,17 @@ public int lastIndexOf(Object o) {
}

@Override
public ListIterator<WeightedRSocket> listIterator() {
public ListIterator<RSocket> listIterator() {
throw new UnsupportedOperationException();
}

@Override
public ListIterator<WeightedRSocket> listIterator(int index) {
public ListIterator<RSocket> listIterator(int index) {
throw new UnsupportedOperationException();
}

@Override
public List<WeightedRSocket> subList(int fromIndex, int toIndex) {
public List<RSocket> subList(int fromIndex, int toIndex) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
*/
package io.rsocket.loadbalance;

import io.rsocket.RSocket;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {

volatile int nextIndex;

static final AtomicIntegerFieldUpdater<RoundRobinLoadbalanceStrategy> NEXT_INDEX =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadbalanceStrategy.class, "nextIndex");

@Override
public WeightedRSocket select(List<WeightedRSocket> sockets) {
public RSocket select(List<RSocket> sockets) {
int length = sockets.size();

int indexToUse = Math.abs(NEXT_INDEX.getAndIncrement(this) % length);
Expand Down
6 changes: 1 addition & 5 deletions rsocket-core/src/main/java/io/rsocket/loadbalance/Stats.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package io.rsocket.loadbalance;

import io.rsocket.Availability;
import io.rsocket.loadbalance.stat.Ewma;
import io.rsocket.loadbalance.stat.FrugalQuantile;
import io.rsocket.loadbalance.stat.Median;
import io.rsocket.loadbalance.stat.Quantile;
import io.rsocket.util.Clock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public class Stats implements Availability {
class Stats implements Availability {

private static final double DEFAULT_LOWER_QUANTILE = 0.5;
private static final double DEFAULT_HIGHER_QUANTILE = 0.8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package io.rsocket.loadbalance;

import io.rsocket.RSocket;
import java.util.List;
import java.util.SplittableRandom;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import reactor.util.annotation.Nullable;

public class WeightedLoadbalanceStrategy implements LoadbalanceStrategy {
class WeightedLoadbalanceStrategy implements LoadbalanceStrategy {

private static final double EXP_FACTOR = 4.0;

Expand Down Expand Up @@ -52,24 +53,19 @@ public WeightedLoadbalanceStrategy(
}

@Override
public Supplier<Stats> statsSupplier() {
return this.statsSupplier;
}

@Override
public WeightedRSocket select(List<WeightedRSocket> sockets) {
public RSocket select(List<RSocket> sockets) {
final int effort = this.effort;
final int size = sockets.size();

WeightedRSocket weightedRSocket;
switch (size) {
case 1:
weightedRSocket = sockets.get(0);
weightedRSocket = (WeightedRSocket) sockets.get(0);
break;
case 2:
{
WeightedRSocket rsc1 = sockets.get(0);
WeightedRSocket rsc2 = sockets.get(1);
WeightedRSocket rsc1 = (WeightedRSocket) sockets.get(0);
WeightedRSocket rsc2 = (WeightedRSocket) sockets.get(1);

double w1 = algorithmicWeight(rsc1);
double w2 = algorithmicWeight(rsc2);
Expand All @@ -92,8 +88,8 @@ public WeightedRSocket select(List<WeightedRSocket> sockets) {
if (i2 >= i1) {
i2++;
}
rsc1 = sockets.get(i1);
rsc2 = sockets.get(i2);
rsc1 = (WeightedRSocket) sockets.get(i1);
rsc2 = (WeightedRSocket) sockets.get(i2);
if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.rsocket.RSocket;

public interface WeightedRSocket extends RSocket {
interface WeightedRSocket extends RSocket {

Stats stats();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.rsocket.core.RSocketServer;
import io.rsocket.loadbalance.LoadbalanceRSocketClient;
import io.rsocket.loadbalance.LoadbalanceRSocketSource;
import io.rsocket.loadbalance.RoundRobinLoadbalanceStrategy;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
Expand Down Expand Up @@ -120,7 +119,7 @@ public static void main(String[] args) {
});

RSocketClient loadBalancedRSocketClient =
LoadbalanceRSocketClient.create(new RoundRobinLoadbalanceStrategy(), producer);
LoadbalanceRSocketClient.builder().withRoundRobinLoadbalanceStrategy().build(producer);

for (int i = 0; i < 10000; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.rsocket.client.filter.RSocketSupplier;
import io.rsocket.loadbalance.LoadbalanceRSocketClient;
import io.rsocket.loadbalance.LoadbalanceRSocketSource;
import io.rsocket.loadbalance.WeightedLoadbalanceStrategy;
import java.util.Collection;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -79,7 +78,9 @@ public static LoadBalancedRSocketMono create(
rsl.stream()
.map(rs -> LoadbalanceRSocketSource.from(rs.toString(), rs.get()))
.collect(Collectors.toList()))
.as(f -> LoadbalanceRSocketClient.create(new WeightedLoadbalanceStrategy(), f)));
.as(
f ->
LoadbalanceRSocketClient.builder().withWeightedLoadbalanceStrategy().build(f)));
}

public static LoadBalancedRSocketMono fromClient(LoadbalanceRSocketClient rSocketClient) {
Expand Down