Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

自适应负载均衡优化 #309

Merged
merged 2 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ protected Nodes getTopicNodes(TopicMetadata topicMetadata, List<BrokerNode> brok
if (node == null) {
node = new Node();
node.setUrl(String.valueOf(brokerNode.getId()));
node.setNearby(brokerNode.isNearby());

if (topicMetadata.getProducerPolicy() != null && topicMetadata.getProducerPolicy().getNearby() != null) {
node.setNearby(topicMetadata.getProducerPolicy().getNearby() && brokerNode.isNearby());
} else {
node.setNearby(false);
}

Node oldNode = brokerNode.putIfAbsentAttachment(NODE_CACHE_KEY, node);
if (oldNode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package org.joyqueue.client.internal.producer.support;

import com.jd.laf.extension.Extension;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.joyqueue.client.internal.metadata.domain.PartitionGroupMetadata;
import org.joyqueue.client.internal.metadata.domain.PartitionNode;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.producer.domain.ProduceMessage;
Expand All @@ -30,7 +31,6 @@
* author: gaohaoxiang
* date: 2018/12/27
*/
@Extension(singleton = false)
public class WeightedPartitionSelector extends AbstractPartitionSelector {

public static final String NAME = "weighted";
Expand All @@ -41,13 +41,25 @@ protected PartitionNode nextPartition(ProduceMessage message, TopicMetadata topi
double weight = 0;
int index = 0;

for (BrokerNode brokerNode : brokerNodes) {
weights[index] = brokerNode.getWeight();
if (weights[index] < 0) {
weights[index] = 0;
if (topicMetadata.getProducerPolicy() != null && MapUtils.isNotEmpty(topicMetadata.getProducerPolicy().getWeight())) {
for (BrokerNode brokerNode : brokerNodes) {
weights[index] = brokerNode.getWeight();
if (weights[index] < 0) {
weights[index] = 0;
}
weight += weights[index];
index++;
}
} else {
for (BrokerNode brokerNode : brokerNodes) {
List<PartitionGroupMetadata> brokerPartitionGroups = topicMetadata.getBrokerPartitionGroups(brokerNode.getId());
weights[index] = (brokerPartitionGroups != null ? brokerPartitionGroups.size() * 10 : brokerNode.getWeight());
if (weights[index] < 0) {
weights[index] = 0;
}
weight += weights[index];
index++;
}
weight += weights[index];
index++;
}

if (weight > 0) {
Expand Down
4 changes: 0 additions & 4 deletions joyqueue-client/joyqueue-client-loadbalance-adaptive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package org.joyqueue.client.loadbalance.adaptive;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.joyqueue.client.loadbalance.adaptive.config.AdaptiveLoadBalanceConfig;
import org.joyqueue.client.loadbalance.adaptive.node.Metrics;
import org.joyqueue.client.loadbalance.adaptive.node.Node;
import org.joyqueue.client.loadbalance.adaptive.node.Nodes;
import org.joyqueue.client.loadbalance.adaptive.node.WeightNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* AdaptiveLoadBalance
Expand All @@ -19,20 +18,19 @@
*/
public class AdaptiveLoadBalance {

protected static final Logger logger = LoggerFactory.getLogger(AdaptiveLoadBalance.class);

private AdaptiveLoadBalanceConfig config;
private List<ScoreJudge> scoreJudges;

private WeightLoadBalance weightLoadBalance = new WeightLoadBalance();
private RandomLoadBalance randomLoadBalance = new RandomLoadBalance();

private Cache<String, Node> selectCache;

public AdaptiveLoadBalance(AdaptiveLoadBalanceConfig config) {
this.config = config;
this.scoreJudges = getScoreJudges(config);
this.selectCache = CacheBuilder.newBuilder()
.expireAfterWrite(config.getComputeInterval(), TimeUnit.MILLISECONDS)
.build();
Metrics.cacheInterval = config.getCacheInterval();
Metrics.sliceInterval = config.getSliceInterval();
}

protected List<ScoreJudge> getScoreJudges(AdaptiveLoadBalanceConfig config) {
Expand All @@ -53,18 +51,11 @@ public Node select(Nodes nodes) {
}

protected boolean isStartup(Nodes nodes) {
return nodes.getMetric().getTps() > config.getSsthreshhold();
return nodes.getMetric().getCount() > config.getSsthreshhold();
}

protected Node adaptiveSelect(Nodes nodes) {
String cacheKey = nodes.toString();
try {
return selectCache.get(cacheKey, () -> {
return doAdaptiveSelect(nodes);
});
} catch (ExecutionException e) {
return doAdaptiveSelect(nodes);
}
return doAdaptiveSelect(nodes);
}

public Node doAdaptiveSelect(Nodes nodes) {
Expand All @@ -75,9 +66,6 @@ public Node doAdaptiveSelect(Nodes nodes) {
double compute = scoreJudge.compute(nodes, node);
score += (compute / 100 * scoreJudge.getRatio());
}
if (score == 0) {
score = 1;
}
weightNodes.add(new WeightNode(node, score));
}
return weightSelect(weightNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
*/
public class AdaptiveLoadBalanceConfig {

private int ssthreshhold = 10;
private int ssthreshhold = 1000;
private String[] judges;
private int computeInterval = 1000 * 1;
private int cacheInterval = 1000 * 1;
private int sliceInterval = 1000 * 60;

public int getSsthreshhold() {
return ssthreshhold;
Expand All @@ -27,11 +28,19 @@ public void setJudges(String[] judges) {
this.judges = judges;
}

public int getComputeInterval() {
return computeInterval;
public int getCacheInterval() {
return cacheInterval;
}

public void setComputeInterval(int computeInterval) {
this.computeInterval = computeInterval;
public void setCacheInterval(int cacheInterval) {
this.cacheInterval = cacheInterval;
}

public int getSliceInterval() {
return sliceInterval;
}

public void setSliceInterval(int sliceInterval) {
this.sliceInterval = sliceInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,10 @@ public class AvailableScoreJudge implements ScoreJudge {

@Override
public double compute(Nodes nodes, Node node) {
if (node.getMetric().getErrorTps() == 0) {
return 0;
} else if (nodes.getMetric().getErrorTps() == 0) {
return -100;
} else if (node.getMetric().getErrorTps() >= nodes.getMetric().getErrorTps()) {
return -100;
} else {
return -Math.min(100 - ((double) (node.getMetric().getErrorTps() / nodes.getMetric().getErrorTps()) * 100), 100);
if (node.getMetric().getErrorCount() != 0) {
return -Integer.MAX_VALUE;
}
return 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@
import org.joyqueue.client.loadbalance.adaptive.node.Nodes;

/**
* TP99ScoreJudge
* AvgScoreJudge
* author: gaohaoxiang
* date: 2020/8/10
*/
public class TP99ScoreJudge implements ScoreJudge {
public class AvgScoreJudge implements ScoreJudge {

private static final double BASE_SCORE = 50;
public static int exceptionThreshhold = 500;

@Override
public double compute(Nodes nodes, Node node) {
double score = node.getMetric().getTp99() - nodes.getMetric().getTp99();
if (score > 0) {
return 0;
} else if (score == 0) {
return BASE_SCORE;
double maxAvg = 0;
for (Node otherNode : nodes.getNodes()) {
if (!otherNode.getUrl().equals(node.getUrl())) {
maxAvg = Math.max(maxAvg, otherNode.getMetric().getAvg());
}
}

double score = node.getMetric().getAvg() / maxAvg * 100;

if (score > exceptionThreshhold) {
return -Integer.MAX_VALUE;
} else {
return BASE_SCORE + Math.min(format(-score), BASE_SCORE);
return 100;
}
}

Expand All @@ -42,6 +48,6 @@ public double getRatio() {

@Override
public String type() {
return "tp99";
return "avg";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public double compute(Nodes nodes, Node node) {
if (node.isNearby()) {
return 100;
}
return 0;
return 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ public class Metric {
private Metrics all = new Metrics();
private Metrics error = new Metrics();

public void refresh() {
all.refresh();
error.reinit();
}

public Tracer begin() {
return new Tracer(this);
}
Expand Down Expand Up @@ -53,10 +58,6 @@ public double getTp75() {
return all.getTp75();
}

public double getTp90() {
return all.getTp90();
}

public double getMax() {
return all.getMax();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,45 @@
*/
public class Metrics {

private Meter meter;
private Reservoir reservoir;
private Histogram histogram;
public static int cacheInterval = 1000 * 1;
public static int sliceInterval = 1000 * 60;

private volatile Meter meter;
private volatile Reservoir reservoir;
private volatile Histogram histogram;

private volatile long lastSlice;

private volatile double lastAvg;
private volatile long lastAvgTime;

public Metrics() {
init();
}

public void slice() {
init();
public void init() {
this.meter = new Meter();
this.reservoir = new ExponentiallyDecayingReservoir();
this.histogram = new Histogram(reservoir);
}

protected void init() {
this.meter = new Meter();
public void slice() {
this.reservoir = new ExponentiallyDecayingReservoir();
this.histogram = new Histogram(reservoir);
}

public void mark() {
this.mark(1L);
public void refresh() {
if (System.currentTimeMillis() - lastSlice > sliceInterval) {
lastSlice = System.currentTimeMillis();
slice();
}
}

public void mark(long count) {
this.meter.mark(count);
public void reinit() {
if (System.currentTimeMillis() - lastSlice > sliceInterval) {
lastSlice = System.currentTimeMillis();
init();
}
}

public void mark(long count, double time) {
Expand Down Expand Up @@ -72,10 +87,6 @@ public double getTp75() {
return this.getSnapshot().get75thPercentile();
}

public double getTp90() {
return this.getSnapshot().getMean();
}

public double getMax() {
return this.getSnapshot().getMax();
}
Expand All @@ -85,7 +96,11 @@ public double getMin() {
}

public double getAvg() {
return this.getSnapshot().getMean();
if (System.currentTimeMillis() - lastAvgTime > cacheInterval) {
lastAvg = this.getSnapshot().getMean();
lastAvgTime = System.currentTimeMillis();
}
return lastAvg;
}

protected Snapshot getSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void setUrl(String url) {
}

public Metric getMetric() {
metric.refresh();
return metric;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public void setMetric(Metric metric) {
}

public Metric getMetric() {
metric.refresh();
return metric;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
org.joyqueue.client.loadbalance.adaptive.judge.RegionScoreJudge
org.joyqueue.client.loadbalance.adaptive.judge.TP99ScoreJudge
#org.joyqueue.client.loadbalance.adaptive.judge.RegionScoreJudge
org.joyqueue.client.loadbalance.adaptive.judge.AvgScoreJudge
org.joyqueue.client.loadbalance.adaptive.judge.AvailableScoreJudge
Loading