Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion include/polaris/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ enum PluginType {
kPluginCircuitBreaker, ///< 节点熔断扩展点
kPluginWeightAdjuster, ///< 动态权重调整扩展点
kPluginStatReporter, ///< 统计上报扩展点
kPluginAlertReporter ///< 告警扩展点
kPluginAlertReporter, ///< 告警扩展点
kPluginServerMetric, ///< SDK与Server请求结果统计

kPluginTypeMaxCount // 插件类型数量
};

/// @brief 路由插件事件回调
Expand Down Expand Up @@ -501,6 +504,25 @@ class AlertReporter : public Plugin {
virtual ReturnCode ReportAlert(AlertLevel alert_level, std::string msg) = 0;
};

///@brief 扩展点接口:收集北极星SDK调用服务器结果
class ServerMetric : public Plugin {
public:
virtual ~ServerMetric() {}

/// @brief 通过配置进行初始化
virtual ReturnCode Init(Config* config, Context* context) = 0;

/// @brief 内部服务调用结果上报
///
/// @param service_key 服务
/// @param instance 实例
/// @param ret_code 返回码
/// @param ret_status 是否成功
/// @param daley 延迟
virtual void MetricReport(const ServiceKey& service_key, const Instance& instance,
ReturnCode ret_code, CallRetStatus ret_status, uint64_t daley) = 0;
};

} // namespace polaris

#endif // POLARIS_CPP_INCLUDE_POLARIS_PLUGIN_H_
21 changes: 21 additions & 0 deletions polaris/api/consumer_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,4 +901,25 @@ ReturnCode ConsumerApiImpl::GetSystemServer(Context* context, const ServiceKey&
return ret;
}

void ConsumerApiImpl::UpdateServerResult(Context* context, const ServiceKey& service_key,
const Instance& instance, PolarisServerCode code,
CallRetStatus status, uint64_t delay) {
InstanceGauge instance_gauge;
instance_gauge.service_namespace = service_key.namespace_;
instance_gauge.service_name = service_key.name_;
instance_gauge.instance_id = instance.GetId();
instance_gauge.call_daley = delay;
instance_gauge.call_ret_code = code;
instance_gauge.call_ret_status = status;
ReturnCode ret_code = kReturnOk;
if (kServerCodeConnectError <= code && code <= kServerCodeInvalidResponse) {
ret_code = (code == kServerCodeRpcTimeout) ? kReturnTimeout : kReturnServerError;
}
ConsumerApiImpl::UpdateServiceCallResult(context, instance_gauge);
ServerMetric* metric = context->GetContextImpl()->GetServerMetric();
if (metric != NULL) {
metric->MetricReport(service_key, instance, ret_code, status, delay);
}
}

} // namespace polaris
5 changes: 5 additions & 0 deletions polaris/api/consumer_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <stdint.h>
#include <string>

#include "model/return_code.h"
#include "polaris/defs.h"
#include "polaris/model.h"
#include "polaris/plugin.h"
Expand Down Expand Up @@ -94,6 +95,10 @@ class ConsumerApiImpl {
const Criteria& criteria, Instance*& instance, uint64_t timeout,
const std::string& protocol = "grpc");

static void UpdateServerResult(Context* context, const ServiceKey& service_key,
const Instance& instance, PolarisServerCode code,
CallRetStatus status, uint64_t delay);

private:
static void GetBackupInstances(ServiceInstances* service_instances, LoadBalancer* load_balancer,
GetOneInstanceRequestAccessor& request,
Expand Down
43 changes: 28 additions & 15 deletions polaris/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ ReturnCode ContextImpl::InitApiConfig(Config* api_config) {

ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) {
// Init server connector plugin
Config* plugin_config = config->GetSubConfig("serverConnector");
Plugin* plugin = NULL;
ScopedPtr<Config> plugin_config(config->GetSubConfig("serverConnector"));
Plugin* plugin = NULL;
std::string protocol =
plugin_config->GetStringOrDefault("protocol", kPluginDefaultServerConnector);
PluginManager::Instance().GetPlugin(protocol, kPluginServerConnector, plugin);
Expand All @@ -582,11 +582,9 @@ ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) {
POLARIS_LOG(LOG_ERROR,
"Plugin factory register with name[%s] and type[%s] return error instance",
protocol.c_str(), PluginTypeToString(kPluginServerConnector));
delete plugin_config;
return kReturnPluginError;
}
ReturnCode ret = server_connector_->Init(plugin_config, context);
delete plugin_config;
ReturnCode ret = server_connector_->Init(plugin_config.Get(), context);
if (ret != kReturnOk) {
return ret;
}
Expand All @@ -607,7 +605,7 @@ ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) {
}

// Init stat reporter
plugin_config = config->GetSubConfig("statReporter");
plugin_config.Reset(config->GetSubConfig("statReporter"));
plugin = NULL;
std::string plugin_name = plugin_config->GetStringOrDefault("name", kPluginDefaultStatReporter);
PluginManager::Instance().GetPlugin(plugin_name, kPluginStatReporter, plugin);
Expand All @@ -616,30 +614,45 @@ ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) {
POLARIS_LOG(LOG_ERROR,
"Plugin factory register with name[%s] and type[%s] return error instance",
plugin_name.c_str(), PluginTypeToString(kPluginStatReporter));
delete plugin_config;
return kReturnPluginError;
}
ret = stat_reporter_->Init(plugin_config, context);
delete plugin_config;
ret = stat_reporter_->Init(plugin_config.Get(), context);
if (ret != kReturnOk) {
return ret;
}

// Init alert reporter
plugin_config = config->GetSubConfig("alertReporter");
plugin = NULL;
plugin_name = plugin_config->GetStringOrDefault("name", kPluginDefaultAlertReporter);
plugin_config.Reset(config->GetSubConfig("alertReporter"));
plugin = NULL;
plugin_name = plugin_config->GetStringOrDefault("name", kPluginDefaultAlertReporter);
PluginManager::Instance().GetPlugin(plugin_name, kPluginAlertReporter, plugin);
alert_reporter_ = dynamic_cast<AlertReporter*>(plugin);
if (alert_reporter_ == NULL) {
POLARIS_LOG(LOG_ERROR,
"Plugin factory register with name[%s] and type[%s] return error instance",
plugin_name.c_str(), PluginTypeToString(kPluginAlertReporter));
delete plugin_config;
return kReturnPluginError;
}
ret = alert_reporter_->Init(plugin_config, context);
delete plugin_config;
ret = alert_reporter_->Init(plugin_config.Get(), context);
if (ret != kReturnOk) {
return ret;
}

// Init server metric
plugin_config.Reset(config->GetSubConfig("serverMetric"));
plugin = NULL;
plugin_name = plugin_config->GetStringOrDefault("name", "");
if (!plugin_name.empty()) {
PluginManager::Instance().GetPlugin(plugin_name, kPluginServerMetric, plugin);
server_metric_.Set(dynamic_cast<ServerMetric*>(plugin));
if (server_metric_.IsNull()) {
POLARIS_LOG(LOG_ERROR,
"Plugin factory register with name[%s] and type[%s] return error instance",
plugin_name.c_str(), PluginTypeToString(kPluginServerMetric));
return kReturnPluginError;
}
ret = server_metric_->Init(plugin_config.Get(), context);
}
return ret;
}

Expand Down
3 changes: 3 additions & 0 deletions polaris/context_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class ContextImpl {

AlertReporter* GetAlertReporter() { return alert_reporter_; }

ServerMetric* GetServerMetric() { return server_metric_.Get(); }

ServiceContext* GetOrCreateServiceContext(const ServiceKey& service_key);

void DeleteServiceContext(const ServiceKey& service_key);
Expand Down Expand Up @@ -219,6 +221,7 @@ class ContextImpl {
LocalRegistry* local_registry_;
StatReporter* stat_reporter_;
AlertReporter* alert_reporter_;
ScopedPtr<ServerMetric> server_metric_;
QuotaManager* quota_manager_; // 配额管理器

// Service config and Service level context
Expand Down
15 changes: 4 additions & 11 deletions polaris/metric/metric_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,19 +474,12 @@ ReturnCode MetricConnector::SelectConnection(const v1::MetricKey& metric_key,

void MetricConnector::UpdateCallResult(Instance* instance, PolarisServerCode server_code) {
POLARIS_ASSERT(instance != NULL);
const PolarisCluster& cluster = context_->GetContextImpl()->GetMetricService();
InstanceGauge instance_gauge;
instance_gauge.service_namespace = cluster.service_.namespace_;
instance_gauge.service_name = cluster.service_.name_;
instance_gauge.instance_id = instance->GetId();
instance_gauge.call_daley = 100;
instance_gauge.call_ret_code = server_code;
const ServiceKey& service = context_->GetContextImpl()->GetMetricService().service_;
CallRetStatus status = kCallRetOk;
if (kServerCodeConnectError <= server_code && server_code <= kServerCodeInvalidResponse) {
instance_gauge.call_ret_status = kCallRetError;
} else {
instance_gauge.call_ret_status = kCallRetOk;
status = kCallRetError;
}
ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge);
ConsumerApiImpl::UpdateServerResult(context_, service, *instance, server_code, status, 100);
}

} // namespace polaris
3 changes: 2 additions & 1 deletion polaris/monitor/api_stat_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ static const char* g_ApiStatKeyMap[] = {"Consumer::InitService",
"Provider::AsyncHeartbeat"};

// 静态断言两处stat key的长度相等
STATIC_ASSERT(sizeof(g_ApiStatKeyMap) / sizeof(const char*) == kApiStatKeyCount, "");
STATIC_ASSERT(sizeof(g_ApiStatKeyMap) / sizeof(const char*) == kApiStatKeyCount,
"api stat key define error");

static const char* g_DelayRangeStr[] = {
"[0ms,2ms)", "[2ms, 10ms)", "[10ms,50ms)", "[50ms,100ms)",
Expand Down
22 changes: 8 additions & 14 deletions polaris/monitor/monitor_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,23 +869,17 @@ bool MonitorReporter::GetInstance(ReportBase* report_data) {

void MonitorReporter::UpdateCallResult(ReportBase* report_data) {
POLARIS_ASSERT(report_data->instance_ != NULL);
InstanceGauge instance_gauge;
const PolarisCluster& monitor_cluster = context_->GetContextImpl()->GetMonitorService();
instance_gauge.service_namespace = monitor_cluster.service_.namespace_;
instance_gauge.service_name = monitor_cluster.service_.name_;
instance_gauge.instance_id = report_data->instance_->GetId();
delete report_data->instance_;
report_data->instance_ = NULL;
instance_gauge.call_daley = Time::GetCurrentTimeMs() - report_data->call_begin_;
instance_gauge.call_ret_code = report_data->server_code_;
const ServiceKey& service = context_->GetContextImpl()->GetMonitorService().service_;
CallRetStatus status = kCallRetOk;
if (kServerCodeConnectError <= report_data->server_code_ &&
report_data->server_code_ <= kServerCodeInvalidResponse) {
instance_gauge.call_ret_status = kCallRetError;
} else {
instance_gauge.call_ret_status = kCallRetOk;
status = kCallRetError;
}
ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge);

uint64_t delay = Time::GetCurrentTimeMs() - report_data->call_begin_;
ConsumerApiImpl::UpdateServerResult(context_, service, *report_data->instance_,
report_data->server_code_, status, delay);
delete report_data->instance_;
report_data->instance_ = NULL;
POLARIS_ASSERT(report_data->grpc_client_ != NULL);
// 由于本方法在grpc stream的callback中调用,为了防止stream释放自身,需要异步释放grpc client
report_data->grpc_client_->CloseStream();
Expand Down
36 changes: 14 additions & 22 deletions polaris/plugin/plugin_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,28 @@
#include "polaris/model.h"
#include "polaris/plugin.h"
#include "utils/indestructible.h"
#include "utils/static_assert.h"

namespace polaris {

#define TO_STR(value) #value

static const char* g_PluginTypeString[] = {
TO_STR(kPluginServerConnector), TO_STR(kPluginLocalRegistry), TO_STR(kPluginServiceRouter),
TO_STR(kPluginLoadBalancer), TO_STR(kPluginHealthChecker), TO_STR(kPluginCircuitBreaker),
TO_STR(kPluginWeightAdjuster), TO_STR(kPluginStatReporter), TO_STR(kPluginAlertReporter),
TO_STR(kPluginServerMetric)};

STATIC_ASSERT(sizeof(g_PluginTypeString) / sizeof(const char*) == kPluginTypeMaxCount,
"plugin type define error");

ReturnCode RegisterPlugin(std::string name, PluginType plugin_type, PluginFactory plugin_factory) {
return PluginManager::Instance().RegisterPlugin(name, plugin_type, plugin_factory);
}

const char* PluginTypeToString(PluginType plugin_type) {
switch (plugin_type) {
case kPluginServerConnector:
return "ServerConnector";
case kPluginLocalRegistry:
return "LocalRegistry";
case kPluginServiceRouter:
return "ServiceRouter";
case kPluginLoadBalancer:
return "LoadBalancer";
case kPluginHealthChecker:
return "HealthChecker";
case kPluginCircuitBreaker:
return "CircuitBreaker";
case kPluginWeightAdjuster:
return "WeightAdjuster";
case kPluginStatReporter:
return "StatReporter";
case kPluginAlertReporter:
return "AlertReporter";
default:
return "UnknowPluginType";
}
POLARIS_ASSERT(plugin_type < kPluginTypeMaxCount);
return g_PluginTypeString[plugin_type];
}

PluginManager& PluginManager::Instance() {
Expand Down
Loading