Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions polaris/plugin/service_router/rule_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ bool RuleRouterCluster::CalculateRouteResult(std::vector<RuleRouterSet*>& result
if (!result.empty()) { // 找不到了不用降级
break;
}
//已经降级寻找了一次,不需要再找了
// 已经降级寻找了一次,不需要再找了
if (downgrade) {
break;
}
Expand All @@ -85,21 +85,21 @@ bool RuleRouterCluster::CalculateRouteResult(std::vector<RuleRouterSet*>& result
}

void RuleRouterCluster::CalculateSubset(ServiceInstances* service_instances, Labels& labels) {
//检查最高优先级的subset,状态健康则返回其中实例,如果同优先级的有多个subset,则根据权重计算返回实例
//如果最高优先级subset不健康,那么按照优先级遍历检查其他subset,如果有健康的,返回其中实例,如有多个同优先级的subset则按权重计算
//若无健康的低优先级subset,则无论最高优先级subset健康与否,都返回其中实例
//如果是熔断半开状态,要支持按比例放量
// PRESERVED:熔断器维持,节点处于保守状态,此时只能正常接收自身流量,不可接受别的集群降级流量

//为空,直接返回
//剔除不健康的subset
// 检查最高优先级的subset,状态健康则返回其中实例,如果同优先级的有多个subset,则根据权重计算返回实例
// 如果最高优先级subset不健康,那么按照优先级遍历检查其他subset,如果有健康的,返回其中实例,如有多个同优先级的subset则按权重计算
// 若无健康的低优先级subset,则无论最高优先级subset健康与否,都返回其中实例
// 如果是熔断半开状态,要支持按比例放量
// PRESERVED:熔断器维持,节点处于保守状态,此时只能正常接收自身流量,不可接受别的集群降级流量

// 为空,直接返回
// 剔除不健康的subset
if (data_.empty()) {
return;
}
std::map<uint32_t, std::vector<RuleRouterSet*> >::iterator data_it = data_.begin();
std::vector<RuleRouterSet*>& cluster_highest_priority = data_it->second;

//通过service_instances直接获取熔断set信息
// 通过service_instances直接获取熔断set信息
std::map<std::string, SetCircuitBreakerUnhealthyInfo> circuit_breaker_sets;
if (service_instances->GetService() == nullptr) {
POLARIS_LOG(LOG_ERROR, "Service member is null");
Expand All @@ -109,20 +109,20 @@ void RuleRouterCluster::CalculateSubset(ServiceInstances* service_instances, Lab

std::vector<RuleRouterSet*> cluster_halfopen;
if (GetHealthyAndHalfOpenSubSet(cluster_highest_priority, circuit_breaker_sets, cluster_halfopen, labels)) {
//如果有半开的,考虑降级
// 如果有半开的,考虑降级
for (std::vector<RuleRouterSet*>::iterator set_it = cluster_halfopen.begin(); set_it != cluster_halfopen.end();
++set_it) {
RuleRouterSet* downgrade = GetDownGradeSubset(circuit_breaker_sets, labels);
if (downgrade) {
//根据半开放量率计算比例
// 根据半开放量率计算比例
float pass_rate = 1.0;
SetCircuitBreakerUnhealthyInfo* breaker_info = nullptr;
GetSetBreakerInfo(circuit_breaker_sets, (*set_it)->subset, labels, &breaker_info);
if (breaker_info != nullptr) {
pass_rate = breaker_info->half_open_release_percent;
}

//修改权重
// 修改权重
(*set_it)->weight_ *= pass_rate;
downgrade->weight_ *= (1 - pass_rate);
cluster_highest_priority.push_back(*set_it);
Expand All @@ -133,8 +133,8 @@ void RuleRouterCluster::CalculateSubset(ServiceInstances* service_instances, Lab
}
return;
}
//没有
//降级寻找,只找健康的set,否则返回最高优先级set
// 没有
// 降级寻找,只找健康的set,否则返回最高优先级set
++data_it;
for (; data_it != data_.end(); ++data_it) {
if (GetHealthySubSet(data_it->second, circuit_breaker_sets, labels)) {
Expand All @@ -143,14 +143,14 @@ void RuleRouterCluster::CalculateSubset(ServiceInstances* service_instances, Lab
}

if (data_it == data_.end()) {
//依旧取最高优先级的实例,此处什么也不用做
// 依旧取最高优先级的实例,此处什么也不用做
return;
} else {
//找到一个可替换的优先级subset组
// 找到一个可替换的优先级subset组
std::map<uint32_t, std::vector<RuleRouterSet*> > data_tmp;
data_tmp[data_it->first].swap(data_it->second);
data_.swap(data_tmp);
//释放Set指针的内存
// 释放Set指针的内存
for (std::map<uint32_t, std::vector<RuleRouterSet*> >::iterator tmp_it = data_tmp.begin(); tmp_it != data_tmp.end();
++tmp_it) {
const std::vector<RuleRouterSet*>& set_data = tmp_it->second;
Expand All @@ -174,15 +174,15 @@ void RuleRouterCluster::GetSetBreakerInfo(std::map<std::string, SetCircuitBreake
std::string subset_key = subset.GetSubInfoStrId() + "#" + labels.GetLabelStr();
std::map<std::string, SetCircuitBreakerUnhealthyInfo>::iterator subset_it = circuit_breaker_sets.find(subset_key);
if (subset_it != circuit_breaker_sets.end()) {
//接口级熔断
// 接口级熔断
*breaker_info = &subset_it->second;
}
}

RuleRouterSet* RuleRouterCluster::GetDownGradeSubset(
std::map<std::string, SetCircuitBreakerUnhealthyInfo>& circuit_breaker_sets, Labels& labels) {
//为空或者只有一个优先级组,都直接返回
//寻找一个健康的降级subset
// 为空或者只有一个优先级组,都直接返回
// 寻找一个健康的降级subset
if (data_.size() <= 1) {
return nullptr;
}
Expand All @@ -197,7 +197,7 @@ RuleRouterSet* RuleRouterCluster::GetDownGradeSubset(
cbs = breaker_info->status;
}
if (!(*it)->isolated_ && cbs == kCircuitBreakerClose) {
//要从vector中移除,免得重复释放,用sharedptr比较好
// 要从vector中移除,免得重复释放,用sharedptr比较好
RuleRouterSet* result = *it;
data_it->second.erase(it);
return result;
Expand All @@ -213,7 +213,7 @@ bool RuleRouterCluster::GetHealthyAndHalfOpenSubSet(
if (cluster.empty()) {
return false;
}
//判断subset状态是否健康,剔除不健康的set
// 判断subset状态是否健康,剔除不健康的set
std::vector<RuleRouterSet*> cluster_healthy;
std::vector<RuleRouterSet*> cluster_unhealthy;
std::vector<RuleRouterSet*> cluster_isolated;
Expand All @@ -236,22 +236,22 @@ bool RuleRouterCluster::GetHealthyAndHalfOpenSubSet(
cluster_unhealthy.push_back(*it);
}
}
//移除隔离的set
// 移除隔离的set
for (std::vector<RuleRouterSet*>::iterator it = cluster_isolated.begin(); it != cluster_isolated.end(); ++it) {
delete *it;
}

if (!cluster_healthy.empty() || !cluster_halfopen.empty()) {
//只保留健康和半开的set
//注意内存释放!
// 只保留健康和半开的set
// 注意内存释放!
cluster.swap(cluster_healthy);
for (std::vector<RuleRouterSet*>::iterator it = cluster_unhealthy.begin(); it != cluster_unhealthy.end(); ++it) {
delete *it;
}
return true;
}
//没有健康的set
//隔离的set已经被释放
// 没有健康的set
// 隔离的set已经被释放
cluster.swap(cluster_unhealthy);
return false;
}
Expand Down Expand Up @@ -317,6 +317,8 @@ ReturnCode RuleServiceRouter::Init(Config* config, Context* context) {
return kReturnOk;
}

std::string RuleServiceRouter::Name() { return "RuleServiceRouter"; }

ReturnCode RuleServiceRouter::DoRoute(RouteInfo& route_info, RouteResult* route_result) {
ServiceRouteRule* route_rule = route_info.GetServiceRouteRule();
POLARIS_CHECK_ARGUMENT(route_rule != nullptr);
Expand Down
2 changes: 2 additions & 0 deletions polaris/plugin/service_router/rule_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class RuleServiceRouter : public ServiceRouter {

virtual RouterStatData* CollectStat();

virtual std::string Name();

private:
bool enable_recover_all_;
float percent_of_min_instances_;
Expand Down
2 changes: 2 additions & 0 deletions polaris/plugin/service_router/set_division_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ ReturnCode SetDivisionServiceRouter::Init(Config* /*config*/, Context* context)
return kReturnOk;
}

std::string SetDivisionServiceRouter::Name() { return "SetDivisionServiceRouter"; }

// 根据主调和被调的metadata判断是否启用set分组
bool SetDivisionServiceRouter::IsSetDivisionRouterEnable(const std::string& caller_set_name,
const std::string& callee_set_name,
Expand Down
2 changes: 2 additions & 0 deletions polaris/plugin/service_router/set_division_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class SetDivisionServiceRouter : public ServiceRouter {

virtual RouterStatData* CollectStat();

virtual std::string Name();

// 根据主调set名和被调set名和metadata判断是否启用set分组
bool IsSetDivisionRouterEnable(const std::string& caller_set_name, const std::string& callee_set_name,
const std::map<std::string, std::string>& callee_metadata);
Expand Down
2 changes: 2 additions & 0 deletions test/mock/mock_service_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class MockServiceRouter : public ServiceRouter {

MOCK_METHOD0(CollectStat, RouterStatData *());

MOCK_METHOD0(Name, std::string());

static Plugin *MockServiceRouterFactory() { return mock_service_router_list_[mock_service_router_index_++]; }

static void RegisterMockPlugin() { RegisterPlugin("mockRouter", kPluginServiceRouter, MockServiceRouterFactory); }
Expand Down