Skip to content

Commit

Permalink
[ISSUE apache#604]Improve the rebalance algorithm (apache#605)
Browse files Browse the repository at this point in the history
* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix ack problem

* modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher

* modify:fix log print

* modify: fix issue#496,ClassCastException

* modify: improve rebalance algorithm

close apache#604
  • Loading branch information
lrhkobe authored and xwm1992 committed Dec 27, 2021
1 parent ffdb133 commit 452921e
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 52 deletions.
Expand Up @@ -97,59 +97,108 @@ private Map<String, String> queryLocalEventMeshMap(String cluster){
}

private void doRebalanceByGroup(String cluster, String group, String purpose, Map<String, String> eventMeshMap) throws Exception{
logger.info("doRebalanceByGroup start, cluster:{}, group:{}, purpose:{}", cluster, group, purpose);

//query distribute data of loacl idc
Map<String, Integer> clientDistributionMap = queryLocalEventMeshDistributeData(cluster, group, purpose, eventMeshMap);
if(clientDistributionMap == null || clientDistributionMap.size() == 0){
return;
}

doRebalanceRedirect(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName, group, purpose, eventMeshMap, clientDistributionMap);
logger.info("doRebalanceByGroup end, cluster:{}, group:{}, purpose:{}", cluster, group, purpose);

}

private void doRebalanceRedirect(String currEventMeshName, String group, String purpose, Map<String, String> eventMeshMap, Map<String, Integer> clientDistributionMap)throws Exception{
if(clientDistributionMap == null || clientDistributionMap.size() == 0){
return;
}

//caculate client num need to redirect in currEventMesh
int judge = caculateRedirectNum(currEventMeshName, group, purpose, clientDistributionMap);

if(judge > 0) {

//select redirect target eventmesh lisg
List<String> eventMeshRecommendResult = selectRedirectEventMesh(group, eventMeshMap, clientDistributionMap, judge, currEventMeshName);
if(eventMeshRecommendResult == null || eventMeshRecommendResult.size() != judge){
logger.warn("doRebalance failed,recommendEventMeshNum is not consistent,recommendResult:{},judge:{}", eventMeshRecommendResult, judge);
return;
}

//do redirect
doRedirect(group, purpose, judge, eventMeshRecommendResult);
}else{
logger.info("rebalance condition not satisfy,group:{}, purpose:{},judge:{}", group, purpose, judge);
}
}

private void doRedirect(String group, String purpose, int judge, List<String> eventMeshRecommendResult) throws Exception{
logger.info("doRebalance redirect start---------------------group:{},judge:{}", group, judge);
Set<Session> sessionSet = null;
if(EventMeshConstants.PURPOSE_SUB.equals(purpose)) {
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupConsumerSessions();
}else if(EventMeshConstants.PURPOSE_PUB.equals(purpose)){
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupProducerSessions();
}else{
logger.warn("doRebalance failed,param is illegal, group:{}, purpose:{}",group, purpose);
return;
}
List<Session> sessionList = new ArrayList<>(sessionSet);
Collections.shuffle(new ArrayList<>(sessionList));

for(int i= 0; i<judge; i++){
//String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,newProxyIp,Integer.valueOf(newProxyPort),sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
logger.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
try {
Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
} catch (InterruptedException e) {
logger.warn("Thread.sleep occur InterruptedException", e);
}
}
logger.info("doRebalance redirect end---------------------group:{}", group);
}

private List<String> selectRedirectEventMesh(String group, Map<String, String> eventMeshMap, Map<String, Integer> clientDistributionMap, int judge, String evenMeshName)throws Exception{
EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
return eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap, group, judge, evenMeshName);
}

public int caculateRedirectNum(String eventMeshName, String group, String purpose, Map<String, Integer> clientDistributionMap) throws Exception{
int sum = 0;
for(Integer item : clientDistributionMap.values()){
sum += item.intValue();
}
int currentNum = 0;
if(clientDistributionMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName) != null){
currentNum = clientDistributionMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName);
if(clientDistributionMap.get(eventMeshName) != null){
currentNum = clientDistributionMap.get(eventMeshName);
}
int avgNum = sum / clientDistributionMap.size();
int judge = avgNum >= 2 ? avgNum/2 : 1;

if(currentNum - avgNum > judge) {
Set<Session> sessionSet = null;
if(EventMeshConstants.PURPOSE_PUB.equals(purpose)){
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupProducerSessions();
}else if(EventMeshConstants.PURPOSE_SUB.equals(purpose)){
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupConsumerSessions();
}else{
logger.warn("doRebalance failed,purpose is not support,purpose:{}", purpose);
return;
int modNum = sum % clientDistributionMap.size();

List<String> eventMeshList = new ArrayList<>(clientDistributionMap.keySet());
Collections.sort(eventMeshList);
int index = -1;
for(int i=0; i < Math.min(modNum, eventMeshList.size()); i++){
if(StringUtils.equals(eventMeshName, eventMeshList.get(i))){
index = i;
break;
}

List<Session> sessionList = new ArrayList<>(sessionSet);
Collections.shuffle(new ArrayList<>(sessionList));
EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
List<String> eventMeshRecommendResult = eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap, group, judge);
if(eventMeshRecommendResult == null || eventMeshRecommendResult.size() != judge){
logger.warn("doRebalance failed,recommendProxyNum is not consistent,recommendResult:{},judge:{}", eventMeshRecommendResult, judge);
return;
}
logger.info("doRebalance redirect start---------------------group:{},purpose:{},judge:{}", group, purpose, judge);
for(int i= 0; i<judge; i++){
//String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,newProxyIp,Integer.valueOf(newProxyPort),sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
logger.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
try {
Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
} catch (InterruptedException e) {
logger.warn("Thread.sleep occur InterruptedException", e);
}
}
logger.info("doRebalance redirect end---------------------group:{}, purpose:{}", group, purpose);
}else{
logger.info("rebalance condition not satisfy,group:{},sum:{},currentNum:{},avgNum:{},judge:{}", group, sum, currentNum, avgNum, judge);
}
int rebalanceResult = 0;
if(avgNum == 0){
rebalanceResult = 1;
}else {
rebalanceResult = (modNum != 0 && index < modNum && index >= 0) ? avgNum + 1 : avgNum;
}
logger.info("rebalance caculateRedirectNum,group:{}, purpose:{},sum:{},avgNum:{}," +
"modNum:{}, index:{}, currentNum:{}, rebalanceResult:{}", group, purpose, sum,
avgNum, modNum, index, currentNum, rebalanceResult);
return currentNum - rebalanceResult;
}

private Map<String, Integer> queryLocalEventMeshDistributeData(String cluster, String group, String purpose, Map<String, String> eventMeshMap){
Expand Down Expand Up @@ -197,12 +246,4 @@ private Map<String, Integer> queryLocalEventMeshDistributeData(String cluster, S

return localEventMeshDistributeData;
}


private class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
@Override
public int compare(Map.Entry<String, Integer> x, Map.Entry<String, Integer> y) {
return x.getValue().intValue() - y.getValue().intValue();
}
}
}
Expand Up @@ -90,8 +90,11 @@ public String calculateRecommendEventMesh(String group, String purpose) throws E
}

@Override
public List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendProxyNum) throws Exception {
logger.info("eventMeshMap:{},clientDistributionMap:{},group:{},recommendNum:{}", eventMeshMap,clientDistributeMap,group,recommendProxyNum);
public List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendProxyNum, String eventMeshName) throws Exception {
if(recommendProxyNum < 1){
return null;
}
logger.info("eventMeshMap:{},clientDistributionMap:{},group:{},recommendNum:{},currEventMeshName:{}", eventMeshMap,clientDistributeMap,group,recommendProxyNum, eventMeshName);
List<String> recommendProxyList = null;

//find eventmesh with least client
Expand All @@ -106,10 +109,10 @@ public List<String> calculateRedirectRecommendEventMesh(Map<String, String> even
recommendProxyList = new ArrayList<>(recommendProxyNum);
while(recommendProxyList.size() < recommendProxyNum){
Map.Entry<String, Integer> minProxyItem = list.get(0);
int currProxyNum = clientDistributeMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName);
int currProxyNum = clientDistributeMap.get(eventMeshName);
recommendProxyList.add(eventMeshMap.get(minProxyItem.getKey()));
clientDistributeMap.put(minProxyItem.getKey(),minProxyItem.getValue() + 1);
clientDistributeMap.put(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName,currProxyNum - 1);
clientDistributeMap.put(eventMeshName,currProxyNum - 1);
Collections.sort(list, vc);
logger.info("clientDistributionMap after sort:{}", list);
}
Expand Down
Expand Up @@ -22,5 +22,5 @@
public interface EventMeshRecommendStrategy {
String calculateRecommendEventMesh(String group, String purpose) throws Exception;

List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendNum) throws Exception;
List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendNum, String eventMeshName) throws Exception;
}
Expand Up @@ -23,6 +23,10 @@
public class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
@Override
public int compare(Map.Entry<String, Integer> x, Map.Entry<String, Integer> y) {
return x.getValue().intValue() - y.getValue().intValue();
if(x.getValue().intValue() != y.getValue().intValue()){
return x.getValue().intValue() - y.getValue().intValue();
}else {
return x.getKey().compareTo(y.getKey());
}
}
}

0 comments on commit 452921e

Please sign in to comment.