Skip to content

Commit 3effe10

Browse files
authored
Add Affinity Router (#14787)
* add affinity router * fix complie error * remove comments * format code
1 parent a0b6e47 commit 3effe10

File tree

11 files changed

+947
-0
lines changed

11 files changed

+947
-0
lines changed

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public interface Constants {
4646

4747
String CONDITIONS_KEY = "conditions";
4848

49+
String AFFINITY_KEY = "affinityAware";
50+
4951
String TAGS_KEY = "tags";
5052

5153
/**
@@ -141,5 +143,10 @@ public interface Constants {
141143

142144
String RULE_VERSION_V31 = "v3.1";
143145

146+
public static final String TRAFFIC_DISABLE_KEY = "trafficDisable";
147+
public static final String RATIO_KEY = "ratio";
148+
public static final int DefaultRouteRatio = 0;
144149
public static final int DefaultRouteConditionSubSetWeight = 100;
150+
public static final int DefaultRoutePriority = 0;
151+
public static final double DefaultAffinityRatio = 0;
145152
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.cluster.router.affinity;
18+
19+
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
21+
import org.apache.dubbo.common.logger.LoggerFactory;
22+
import org.apache.dubbo.common.utils.CollectionUtils;
23+
import org.apache.dubbo.common.utils.Holder;
24+
import org.apache.dubbo.common.utils.NetUtils;
25+
import org.apache.dubbo.rpc.Invocation;
26+
import org.apache.dubbo.rpc.Invoker;
27+
import org.apache.dubbo.rpc.RpcException;
28+
import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
29+
import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcher;
30+
import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcherFactory;
31+
import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
32+
import org.apache.dubbo.rpc.cluster.router.state.BitList;
33+
34+
import java.text.ParseException;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Set;
38+
39+
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
40+
import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY;
41+
import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_EXEC_CONDITION_ROUTER;
42+
import static org.apache.dubbo.rpc.cluster.Constants.AFFINITY_KEY;
43+
import static org.apache.dubbo.rpc.cluster.Constants.DefaultAffinityRatio;
44+
import static org.apache.dubbo.rpc.cluster.Constants.RATIO_KEY;
45+
import static org.apache.dubbo.rpc.cluster.Constants.RULE_KEY;
46+
import static org.apache.dubbo.rpc.cluster.Constants.RUNTIME_KEY;
47+
48+
/**
49+
* # dubbo/config/group/{$name}.affinity-router
50+
* configVersion: v3.1
51+
* scope: service # Or application
52+
* key: service.apache.com
53+
* enabled: true
54+
* runtime: true
55+
* affinityAware:
56+
* key: region
57+
* ratio: 20
58+
*/
59+
public class AffinityStateRouter<T> extends AbstractStateRouter<T> {
60+
public static final String NAME = "affinity";
61+
62+
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractStateRouter.class);
63+
64+
protected String affinityKey;
65+
protected Double ratio;
66+
protected ConditionMatcher matchMatcher;
67+
protected List<ConditionMatcherFactory> matcherFactories;
68+
69+
private final boolean enabled;
70+
71+
public AffinityStateRouter(URL url) {
72+
super(url);
73+
this.enabled = url.getParameter(ENABLED_KEY, true);
74+
this.affinityKey = url.getParameter(AFFINITY_KEY, "");
75+
this.ratio = url.getParameter(RATIO_KEY, DefaultAffinityRatio);
76+
this.matcherFactories =
77+
moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions();
78+
if (this.enabled) {
79+
this.init(affinityKey);
80+
}
81+
}
82+
83+
public AffinityStateRouter(URL url, String affinityKey, Double ratio, boolean enabled) {
84+
super(url);
85+
this.enabled = enabled;
86+
this.affinityKey = affinityKey;
87+
this.ratio = ratio;
88+
matcherFactories =
89+
moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions();
90+
if (this.enabled) {
91+
this.init(affinityKey);
92+
}
93+
}
94+
95+
public void init(String rule) {
96+
try {
97+
if (rule == null || rule.trim().isEmpty()) {
98+
throw new IllegalArgumentException("Illegal affinity rule!");
99+
}
100+
this.matchMatcher = parseRule(affinityKey);
101+
} catch (ParseException e) {
102+
throw new IllegalStateException(e.getMessage(), e);
103+
}
104+
}
105+
106+
private ConditionMatcher parseRule(String rule) throws ParseException {
107+
ConditionMatcher matcher = getMatcher(rule);
108+
// Multiple values
109+
Set<String> values = matcher.getMatches();
110+
values.add(getUrl().getParameter(rule));
111+
return matcher;
112+
}
113+
114+
@Override
115+
protected BitList<Invoker<T>> doRoute(
116+
BitList<Invoker<T>> invokers,
117+
URL url,
118+
Invocation invocation,
119+
boolean needToPrintMessage,
120+
Holder<RouterSnapshotNode<T>> nodeHolder,
121+
Holder<String> messageHolder)
122+
throws RpcException {
123+
if (!enabled) {
124+
if (needToPrintMessage) {
125+
messageHolder.set("Directly return. Reason: AffinityRouter disabled.");
126+
}
127+
return invokers;
128+
}
129+
130+
if (CollectionUtils.isEmpty(invokers)) {
131+
if (needToPrintMessage) {
132+
messageHolder.set("Directly return. Reason: Invokers from previous router is empty.");
133+
}
134+
return invokers;
135+
}
136+
try {
137+
BitList<Invoker<T>> result = invokers.clone();
138+
result.removeIf(invoker -> !matchInvoker(invoker.getUrl(), url));
139+
140+
if (result.size() / (double) invokers.size() >= ratio / (double) 100) {
141+
if (needToPrintMessage) {
142+
messageHolder.set("Match return.");
143+
}
144+
return result;
145+
} else {
146+
logger.warn(
147+
CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY,
148+
"execute affinity state router result is less than defined" + this.ratio,
149+
"",
150+
"The affinity result is ignored. consumer: " + NetUtils.getLocalHost()
151+
+ ", service: " + url.getServiceKey() + ", router: "
152+
+ url.getParameterAndDecoded(RULE_KEY));
153+
if (needToPrintMessage) {
154+
messageHolder.set("Directly return. Reason: Affinity state router result is less than defined.");
155+
}
156+
return invokers;
157+
}
158+
} catch (Throwable t) {
159+
logger.error(
160+
CLUSTER_FAILED_EXEC_CONDITION_ROUTER,
161+
"execute affinity state router exception",
162+
"",
163+
"Failed to execute affinity router rule: " + getUrl() + ", invokers: " + invokers + ", cause: "
164+
+ t.getMessage(),
165+
t);
166+
}
167+
if (needToPrintMessage) {
168+
messageHolder.set("Directly return. Reason: Error occurred ( or result is empty ).");
169+
}
170+
return invokers;
171+
}
172+
173+
@Override
174+
public boolean isRuntime() {
175+
// We always return true for previously defined Router, that is, old Router doesn't support cache anymore.
176+
// return true;
177+
return this.getUrl().getParameter(RUNTIME_KEY, false);
178+
}
179+
180+
private ConditionMatcher getMatcher(String key) {
181+
return moduleModel
182+
.getExtensionLoader(ConditionMatcherFactory.class)
183+
.getExtension("param")
184+
.createMatcher(key, moduleModel);
185+
}
186+
187+
private boolean matchInvoker(URL url, URL param) {
188+
return doMatch(url, param, null, matchMatcher);
189+
}
190+
191+
private boolean doMatch(URL url, URL param, Invocation invocation, ConditionMatcher matcher) {
192+
Map<String, String> sample = url.toOriginalMap();
193+
if (!matcher.isMatch(sample, param, invocation, false)) {
194+
return false;
195+
}
196+
return true;
197+
}
198+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.cluster.router.affinity;
18+
19+
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
21+
import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
22+
23+
/**
24+
* affinity router factory
25+
*/
26+
public class AffinityStateRouterFactory extends CacheableStateRouterFactory {
27+
28+
public static final String NAME = "affinity";
29+
30+
@Override
31+
protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
32+
return new AffinityStateRouter<T>(url);
33+
}
34+
}

0 commit comments

Comments
 (0)