diff --git a/include/polaris/plugin.h b/include/polaris/plugin.h index 0ad2e39..78dda6b 100644 --- a/include/polaris/plugin.h +++ b/include/polaris/plugin.h @@ -43,7 +43,10 @@ enum PluginType { kPluginCircuitBreaker, ///< 节点熔断扩展点 kPluginWeightAdjuster, ///< 动态权重调整扩展点 kPluginStatReporter, ///< 统计上报扩展点 - kPluginAlertReporter ///< 告警扩展点 + kPluginAlertReporter, ///< 告警扩展点 + kPluginServerMetric, ///< SDK与Server请求结果统计 + + kPluginTypeMaxCount // 插件类型数量 }; /// @brief 路由插件事件回调 @@ -501,6 +504,25 @@ class AlertReporter : public Plugin { virtual ReturnCode ReportAlert(AlertLevel alert_level, std::string msg) = 0; }; +///@brief 扩展点接口:收集北极星SDK调用服务器结果 +class ServerMetric : public Plugin { +public: + virtual ~ServerMetric() {} + + /// @brief 通过配置进行初始化 + virtual ReturnCode Init(Config* config, Context* context) = 0; + + /// @brief 内部服务调用结果上报 + /// + /// @param service_key 服务 + /// @param instance 实例 + /// @param ret_code 返回码 + /// @param ret_status 是否成功 + /// @param daley 延迟 + virtual void MetricReport(const ServiceKey& service_key, const Instance& instance, + ReturnCode ret_code, CallRetStatus ret_status, uint64_t daley) = 0; +}; + } // namespace polaris #endif // POLARIS_CPP_INCLUDE_POLARIS_PLUGIN_H_ diff --git a/polaris/api/consumer_api.cpp b/polaris/api/consumer_api.cpp index b0ab052..a2caeb0 100644 --- a/polaris/api/consumer_api.cpp +++ b/polaris/api/consumer_api.cpp @@ -901,4 +901,25 @@ ReturnCode ConsumerApiImpl::GetSystemServer(Context* context, const ServiceKey& return ret; } +void ConsumerApiImpl::UpdateServerResult(Context* context, const ServiceKey& service_key, + const Instance& instance, PolarisServerCode code, + CallRetStatus status, uint64_t delay) { + InstanceGauge instance_gauge; + instance_gauge.service_namespace = service_key.namespace_; + instance_gauge.service_name = service_key.name_; + instance_gauge.instance_id = instance.GetId(); + instance_gauge.call_daley = delay; + instance_gauge.call_ret_code = code; + instance_gauge.call_ret_status = status; + ReturnCode ret_code = kReturnOk; + if (kServerCodeConnectError <= code && code <= kServerCodeInvalidResponse) { + ret_code = (code == kServerCodeRpcTimeout) ? kReturnTimeout : kReturnServerError; + } + ConsumerApiImpl::UpdateServiceCallResult(context, instance_gauge); + ServerMetric* metric = context->GetContextImpl()->GetServerMetric(); + if (metric != NULL) { + metric->MetricReport(service_key, instance, ret_code, status, delay); + } +} + } // namespace polaris diff --git a/polaris/api/consumer_api.h b/polaris/api/consumer_api.h index f83912b..13760cd 100644 --- a/polaris/api/consumer_api.h +++ b/polaris/api/consumer_api.h @@ -17,6 +17,7 @@ #include #include +#include "model/return_code.h" #include "polaris/defs.h" #include "polaris/model.h" #include "polaris/plugin.h" @@ -94,6 +95,10 @@ class ConsumerApiImpl { const Criteria& criteria, Instance*& instance, uint64_t timeout, const std::string& protocol = "grpc"); + static void UpdateServerResult(Context* context, const ServiceKey& service_key, + const Instance& instance, PolarisServerCode code, + CallRetStatus status, uint64_t delay); + private: static void GetBackupInstances(ServiceInstances* service_instances, LoadBalancer* load_balancer, GetOneInstanceRequestAccessor& request, diff --git a/polaris/context.cpp b/polaris/context.cpp index cc7592d..58c9754 100644 --- a/polaris/context.cpp +++ b/polaris/context.cpp @@ -572,8 +572,8 @@ ReturnCode ContextImpl::InitApiConfig(Config* api_config) { ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) { // Init server connector plugin - Config* plugin_config = config->GetSubConfig("serverConnector"); - Plugin* plugin = NULL; + ScopedPtr plugin_config(config->GetSubConfig("serverConnector")); + Plugin* plugin = NULL; std::string protocol = plugin_config->GetStringOrDefault("protocol", kPluginDefaultServerConnector); PluginManager::Instance().GetPlugin(protocol, kPluginServerConnector, plugin); @@ -582,11 +582,9 @@ ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) { POLARIS_LOG(LOG_ERROR, "Plugin factory register with name[%s] and type[%s] return error instance", protocol.c_str(), PluginTypeToString(kPluginServerConnector)); - delete plugin_config; return kReturnPluginError; } - ReturnCode ret = server_connector_->Init(plugin_config, context); - delete plugin_config; + ReturnCode ret = server_connector_->Init(plugin_config.Get(), context); if (ret != kReturnOk) { return ret; } @@ -607,7 +605,7 @@ ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) { } // Init stat reporter - plugin_config = config->GetSubConfig("statReporter"); + plugin_config.Reset(config->GetSubConfig("statReporter")); plugin = NULL; std::string plugin_name = plugin_config->GetStringOrDefault("name", kPluginDefaultStatReporter); PluginManager::Instance().GetPlugin(plugin_name, kPluginStatReporter, plugin); @@ -616,30 +614,45 @@ ReturnCode ContextImpl::InitGlobalConfig(Config* config, Context* context) { POLARIS_LOG(LOG_ERROR, "Plugin factory register with name[%s] and type[%s] return error instance", plugin_name.c_str(), PluginTypeToString(kPluginStatReporter)); - delete plugin_config; return kReturnPluginError; } - ret = stat_reporter_->Init(plugin_config, context); - delete plugin_config; + ret = stat_reporter_->Init(plugin_config.Get(), context); if (ret != kReturnOk) { return ret; } // Init alert reporter - plugin_config = config->GetSubConfig("alertReporter"); - plugin = NULL; - plugin_name = plugin_config->GetStringOrDefault("name", kPluginDefaultAlertReporter); + plugin_config.Reset(config->GetSubConfig("alertReporter")); + plugin = NULL; + plugin_name = plugin_config->GetStringOrDefault("name", kPluginDefaultAlertReporter); PluginManager::Instance().GetPlugin(plugin_name, kPluginAlertReporter, plugin); alert_reporter_ = dynamic_cast(plugin); if (alert_reporter_ == NULL) { POLARIS_LOG(LOG_ERROR, "Plugin factory register with name[%s] and type[%s] return error instance", plugin_name.c_str(), PluginTypeToString(kPluginAlertReporter)); - delete plugin_config; return kReturnPluginError; } - ret = alert_reporter_->Init(plugin_config, context); - delete plugin_config; + ret = alert_reporter_->Init(plugin_config.Get(), context); + if (ret != kReturnOk) { + return ret; + } + + // Init server metric + plugin_config.Reset(config->GetSubConfig("serverMetric")); + plugin = NULL; + plugin_name = plugin_config->GetStringOrDefault("name", ""); + if (!plugin_name.empty()) { + PluginManager::Instance().GetPlugin(plugin_name, kPluginServerMetric, plugin); + server_metric_.Set(dynamic_cast(plugin)); + if (server_metric_.IsNull()) { + POLARIS_LOG(LOG_ERROR, + "Plugin factory register with name[%s] and type[%s] return error instance", + plugin_name.c_str(), PluginTypeToString(kPluginServerMetric)); + return kReturnPluginError; + } + ret = server_metric_->Init(plugin_config.Get(), context); + } return ret; } diff --git a/polaris/context_internal.h b/polaris/context_internal.h index 3a63cbd..80e83af 100644 --- a/polaris/context_internal.h +++ b/polaris/context_internal.h @@ -110,6 +110,8 @@ class ContextImpl { AlertReporter* GetAlertReporter() { return alert_reporter_; } + ServerMetric* GetServerMetric() { return server_metric_.Get(); } + ServiceContext* GetOrCreateServiceContext(const ServiceKey& service_key); void DeleteServiceContext(const ServiceKey& service_key); @@ -219,6 +221,7 @@ class ContextImpl { LocalRegistry* local_registry_; StatReporter* stat_reporter_; AlertReporter* alert_reporter_; + ScopedPtr server_metric_; QuotaManager* quota_manager_; // 配额管理器 // Service config and Service level context diff --git a/polaris/metric/metric_connector.cpp b/polaris/metric/metric_connector.cpp index d996eb8..65b6249 100644 --- a/polaris/metric/metric_connector.cpp +++ b/polaris/metric/metric_connector.cpp @@ -474,19 +474,12 @@ ReturnCode MetricConnector::SelectConnection(const v1::MetricKey& metric_key, void MetricConnector::UpdateCallResult(Instance* instance, PolarisServerCode server_code) { POLARIS_ASSERT(instance != NULL); - const PolarisCluster& cluster = context_->GetContextImpl()->GetMetricService(); - InstanceGauge instance_gauge; - instance_gauge.service_namespace = cluster.service_.namespace_; - instance_gauge.service_name = cluster.service_.name_; - instance_gauge.instance_id = instance->GetId(); - instance_gauge.call_daley = 100; - instance_gauge.call_ret_code = server_code; + const ServiceKey& service = context_->GetContextImpl()->GetMetricService().service_; + CallRetStatus status = kCallRetOk; if (kServerCodeConnectError <= server_code && server_code <= kServerCodeInvalidResponse) { - instance_gauge.call_ret_status = kCallRetError; - } else { - instance_gauge.call_ret_status = kCallRetOk; + status = kCallRetError; } - ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge); + ConsumerApiImpl::UpdateServerResult(context_, service, *instance, server_code, status, 100); } } // namespace polaris diff --git a/polaris/monitor/api_stat_registry.cpp b/polaris/monitor/api_stat_registry.cpp index 1b91f9b..7462d0c 100644 --- a/polaris/monitor/api_stat_registry.cpp +++ b/polaris/monitor/api_stat_registry.cpp @@ -50,7 +50,8 @@ static const char* g_ApiStatKeyMap[] = {"Consumer::InitService", "Provider::AsyncHeartbeat"}; // 静态断言两处stat key的长度相等 -STATIC_ASSERT(sizeof(g_ApiStatKeyMap) / sizeof(const char*) == kApiStatKeyCount, ""); +STATIC_ASSERT(sizeof(g_ApiStatKeyMap) / sizeof(const char*) == kApiStatKeyCount, + "api stat key define error"); static const char* g_DelayRangeStr[] = { "[0ms,2ms)", "[2ms, 10ms)", "[10ms,50ms)", "[50ms,100ms)", diff --git a/polaris/monitor/monitor_reporter.cpp b/polaris/monitor/monitor_reporter.cpp index 285e393..923b1a2 100644 --- a/polaris/monitor/monitor_reporter.cpp +++ b/polaris/monitor/monitor_reporter.cpp @@ -869,23 +869,17 @@ bool MonitorReporter::GetInstance(ReportBase* report_data) { void MonitorReporter::UpdateCallResult(ReportBase* report_data) { POLARIS_ASSERT(report_data->instance_ != NULL); - InstanceGauge instance_gauge; - const PolarisCluster& monitor_cluster = context_->GetContextImpl()->GetMonitorService(); - instance_gauge.service_namespace = monitor_cluster.service_.namespace_; - instance_gauge.service_name = monitor_cluster.service_.name_; - instance_gauge.instance_id = report_data->instance_->GetId(); - delete report_data->instance_; - report_data->instance_ = NULL; - instance_gauge.call_daley = Time::GetCurrentTimeMs() - report_data->call_begin_; - instance_gauge.call_ret_code = report_data->server_code_; + const ServiceKey& service = context_->GetContextImpl()->GetMonitorService().service_; + CallRetStatus status = kCallRetOk; if (kServerCodeConnectError <= report_data->server_code_ && report_data->server_code_ <= kServerCodeInvalidResponse) { - instance_gauge.call_ret_status = kCallRetError; - } else { - instance_gauge.call_ret_status = kCallRetOk; + status = kCallRetError; } - ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge); - + uint64_t delay = Time::GetCurrentTimeMs() - report_data->call_begin_; + ConsumerApiImpl::UpdateServerResult(context_, service, *report_data->instance_, + report_data->server_code_, status, delay); + delete report_data->instance_; + report_data->instance_ = NULL; POLARIS_ASSERT(report_data->grpc_client_ != NULL); // 由于本方法在grpc stream的callback中调用,为了防止stream释放自身,需要异步释放grpc client report_data->grpc_client_->CloseStream(); diff --git a/polaris/plugin/plugin_manager.cpp b/polaris/plugin/plugin_manager.cpp index f06f162..c9c3714 100644 --- a/polaris/plugin/plugin_manager.cpp +++ b/polaris/plugin/plugin_manager.cpp @@ -45,36 +45,28 @@ #include "polaris/model.h" #include "polaris/plugin.h" #include "utils/indestructible.h" +#include "utils/static_assert.h" namespace polaris { +#define TO_STR(value) #value + +static const char* g_PluginTypeString[] = { + TO_STR(kPluginServerConnector), TO_STR(kPluginLocalRegistry), TO_STR(kPluginServiceRouter), + TO_STR(kPluginLoadBalancer), TO_STR(kPluginHealthChecker), TO_STR(kPluginCircuitBreaker), + TO_STR(kPluginWeightAdjuster), TO_STR(kPluginStatReporter), TO_STR(kPluginAlertReporter), + TO_STR(kPluginServerMetric)}; + +STATIC_ASSERT(sizeof(g_PluginTypeString) / sizeof(const char*) == kPluginTypeMaxCount, + "plugin type define error"); + ReturnCode RegisterPlugin(std::string name, PluginType plugin_type, PluginFactory plugin_factory) { return PluginManager::Instance().RegisterPlugin(name, plugin_type, plugin_factory); } const char* PluginTypeToString(PluginType plugin_type) { - switch (plugin_type) { - case kPluginServerConnector: - return "ServerConnector"; - case kPluginLocalRegistry: - return "LocalRegistry"; - case kPluginServiceRouter: - return "ServiceRouter"; - case kPluginLoadBalancer: - return "LoadBalancer"; - case kPluginHealthChecker: - return "HealthChecker"; - case kPluginCircuitBreaker: - return "CircuitBreaker"; - case kPluginWeightAdjuster: - return "WeightAdjuster"; - case kPluginStatReporter: - return "StatReporter"; - case kPluginAlertReporter: - return "AlertReporter"; - default: - return "UnknowPluginType"; - } + POLARIS_ASSERT(plugin_type < kPluginTypeMaxCount); + return g_PluginTypeString[plugin_type]; } PluginManager& PluginManager::Instance() { diff --git a/polaris/plugin/server_connector/server_connector.cpp b/polaris/plugin/server_connector/server_connector.cpp index a83b280..75c90a1 100644 --- a/polaris/plugin/server_connector/server_connector.cpp +++ b/polaris/plugin/server_connector/server_connector.cpp @@ -115,9 +115,9 @@ void DiscoverEventTask::Run() { /////////////////////////////////////////////////////////////////////////////// GrpcServerConnector::GrpcServerConnector() : discover_stream_state_(kDiscoverStreamNotInit), context_(NULL), task_thread_id_(0), - grpc_client_(NULL), discover_stream_(NULL), stream_response_time_(0), - server_switch_interval_(0), server_switch_state_(kServerSwitchInit), message_used_time_(0), - request_queue_size_(0), last_cache_version_(0) {} + discover_instance_(NULL), grpc_client_(NULL), discover_stream_(NULL), + stream_response_time_(0), server_switch_interval_(0), server_switch_state_(kServerSwitchInit), + message_used_time_(0), request_queue_size_(0), last_cache_version_(0) {} GrpcServerConnector::~GrpcServerConnector() { // 关闭线程 @@ -134,6 +134,10 @@ GrpcServerConnector::~GrpcServerConnector() { discover_stream_ = NULL; delete grpc_client_; } + if (discover_instance_ != NULL) { + delete discover_instance_; + discover_instance_ = NULL; + } for (std::map::iterator it = async_request_map_.begin(); it != async_request_map_.end(); ++it) { delete it->second; @@ -580,16 +584,18 @@ void GrpcServerConnector::ServerSwitch() { std::string host; int port = 0; if (discover_stream_state_ >= kDiscoverStreamGetInstance) { // 说明内部服务已经返回 - Instance* instance = NULL; + if (discover_instance_ != NULL) { + delete discover_instance_; + discover_instance_ = NULL; + } const ServiceKey& discover_service = context_->GetContextImpl()->GetDiscoverService().service_; bool ignore_half_open = server_switch_state_ != kServerSwitchPeriodic; // 周期切换才选半开节点 - ReturnCode ret_code = SelectInstance(discover_service, 0, &instance, ignore_half_open); + ReturnCode ret_code = + SelectInstance(discover_service, 0, &discover_instance_, ignore_half_open); if (ret_code == kReturnOk) { - POLARIS_ASSERT(instance != NULL); - host = instance->GetHost(); - port = instance->GetPort(); - discover_instance_ = instance->GetId(); - delete instance; + POLARIS_ASSERT(discover_instance_ != NULL); + host = discover_instance_->GetHost(); + port = discover_instance_->GetPort(); discover_stream_state_ = kDiscoverStreamInit; POLARIS_LOG(LOG_INFO, "discover stream switch to discover server[%s:%d]", host.c_str(), port); } else { @@ -636,27 +642,21 @@ void GrpcServerConnector::ServerSwitch() { } void GrpcServerConnector::UpdateCallResult(PolarisServerCode server_code, uint64_t delay) { - if (discover_instance_.empty()) { + if (discover_instance_ == NULL) { return; // 内部埋点实例 不要上报 } - InstanceGauge instance_gauge; - const ServiceKey& service = context_->GetContextImpl()->GetDiscoverService().service_; - instance_gauge.service_namespace = service.namespace_; - instance_gauge.service_name = service.name_; - instance_gauge.instance_id = discover_instance_; - instance_gauge.call_daley = delay; - instance_gauge.call_ret_code = server_code; + const ServiceKey& service = context_->GetContextImpl()->GetDiscoverService().service_; + CallRetStatus status = kCallRetOk; if (kServerCodeConnectError <= server_code && server_code <= kServerCodeInvalidResponse) { if (server_code == kServerCodeRpcTimeout && stream_response_time_ + delay > Time::GetCurrentTimeMs()) { - instance_gauge.call_ret_status = kCallRetOk; + status = kCallRetOk; } else { // 消息超时且stream上超时时间内没有数据返回则上报失败 - instance_gauge.call_ret_status = kCallRetError; + status = kCallRetError; } - } else { - instance_gauge.call_ret_status = kCallRetOk; } - ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge); + ConsumerApiImpl::UpdateServerResult(context_, service, *discover_instance_, server_code, status, + delay); } BlockRequest* GrpcServerConnector::CreateBlockRequest(BlockRequestType request_type, @@ -839,27 +839,19 @@ bool GrpcServerConnector::GetInstance(BlockRequest* block_request) { void GrpcServerConnector::UpdateCallResult(BlockRequest* block_request) { POLARIS_ASSERT(block_request->instance_ != NULL); - InstanceGauge instance_gauge; - const PolarisCluster& system_cluster = block_request->request_type_ == kBlockHeartbeat - ? context_->GetContextImpl()->GetHeartbeatService() - : context_->GetContextImpl()->GetDiscoverService(); - instance_gauge.service_namespace = system_cluster.service_.namespace_; - instance_gauge.service_name = system_cluster.service_.name_; - instance_gauge.instance_id = block_request->instance_->GetId(); + const ServiceKey& service = block_request->request_type_ == kBlockHeartbeat + ? context_->GetContextImpl()->GetHeartbeatService().service_ + : context_->GetContextImpl()->GetDiscoverService().service_; + CallRetStatus status = kCallRetOk; + if (kServerCodeConnectError <= block_request->server_code_ && + block_request->server_code_ <= kServerCodeInvalidResponse) { + status = kCallRetError; + } + uint64_t delay = Time::GetCurrentTimeMs() - block_request->call_begin_; + ConsumerApiImpl::UpdateServerResult(context_, service, *block_request->instance_, + block_request->server_code_, status, delay); delete block_request->instance_; block_request->instance_ = NULL; - if (instance_gauge.instance_id.empty()) { - return; // 内部埋点实例 不要上报 - } - instance_gauge.call_daley = Time::GetCurrentTimeMs() - block_request->call_begin_; - instance_gauge.call_ret_code = block_request->server_code_; - PolarisServerCode server_code = block_request->server_code_; - if (kServerCodeConnectError <= server_code && server_code <= kServerCodeInvalidResponse) { - instance_gauge.call_ret_status = kCallRetError; - } else { - instance_gauge.call_ret_status = kCallRetOk; - } - ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge); } /////////////////////////////////////////////////////////////////////////////// @@ -1240,20 +1232,15 @@ void AsyncRequest::OnFailure(grpc::GrpcStatusCode status, const std::string& mes void AsyncRequest::Complete(PolarisServerCode server_code) { if (server_ != NULL) { // 上报调用结果 - InstanceGauge instance_gauge; const ServiceKey& service = connector_->context_->GetContextImpl()->GetHeartbeatService().service_; - instance_gauge.service_namespace = service.namespace_; - instance_gauge.service_name = service.name_; - instance_gauge.instance_id = server_->GetId(); - instance_gauge.call_daley = Time::GetCurrentTimeMs() - begin_time_; - instance_gauge.call_ret_code = server_code; + uint64_t delay = Time::GetCurrentTimeMs() - begin_time_; + CallRetStatus status = kCallRetOk; if (kServerCodeConnectError <= server_code && server_code <= kServerCodeInvalidResponse) { - instance_gauge.call_ret_status = kCallRetError; - } else { - instance_gauge.call_ret_status = kCallRetOk; + status = kCallRetError; } - ConsumerApiImpl::UpdateServiceCallResult(connector_->context_, instance_gauge); + ConsumerApiImpl::UpdateServerResult(connector_->context_, service, *server_, server_code, + status, delay); } // 释放自身 connector_->async_request_map_.erase(request_id_); diff --git a/polaris/plugin/server_connector/server_connector.h b/polaris/plugin/server_connector/server_connector.h index 275bef0..038bacc 100644 --- a/polaris/plugin/server_connector/server_connector.h +++ b/polaris/plugin/server_connector/server_connector.h @@ -181,7 +181,7 @@ class GrpcServerConnector : public ServerConnector, std::vector server_lists_; pthread_t task_thread_id_; Reactor reactor_; - std::string discover_instance_; + Instance* discover_instance_; grpc::GrpcClient* grpc_client_; grpc::GrpcStream* discover_stream_; uint64_t stream_response_time_; diff --git a/polaris/quota/rate_limit_connector.cpp b/polaris/quota/rate_limit_connector.cpp index 673fe13..8c20daa 100644 --- a/polaris/quota/rate_limit_connector.cpp +++ b/polaris/quota/rate_limit_connector.cpp @@ -59,15 +59,15 @@ bool operator<(const LimitTargetKey& lhs, const LimitTargetKey& rhs) { } RateLimitConnection::RateLimitConnection(RateLimitConnector& connector, - const uint64_t& request_timeout, Instance& instance, + const uint64_t& request_timeout, Instance* instance, const ServiceKey& cluster, const std::string& id) : connector_(connector), reactor_(connector.GetReactor()), request_timeout_(request_timeout) { - instance_id_ = instance.GetId(); + instance_ = instance; cluster_ = cluster; connection_id_ = id; client_ = new grpc::GrpcClient(reactor_); // 发起异步请求 - client_->ConnectTo(instance.GetHost(), instance.GetPort(), 1000, + client_->ConnectTo(instance_->GetHost(), instance_->GetPort(), 1000, new grpc::ConnectCallbackRef(*this)); stream_ = NULL; last_used_time_ = Time::GetCurrentTimeMs(); @@ -81,6 +81,10 @@ RateLimitConnection::RateLimitConnection(RateLimitConnector& connector, RateLimitConnection::~RateLimitConnection() { stream_ = NULL; + if (instance_ != NULL) { + delete instance_; + instance_ = NULL; + } if (client_ != NULL) { delete client_; client_ = NULL; @@ -184,7 +188,7 @@ void RateLimitConnection::OnSuccess(metric::v2::TimeAdjustResponse* response) { } POLARIS_LOG(LOG_TRACE, "sync time diff:%" PRId64 "", time_diff_); delete response; - connector_.UpdateCallResult(cluster_, instance_id_, delay, kServerCodeReturnOk); + connector_.UpdateCallResult(cluster_, instance_, delay, kServerCodeReturnOk); // 设置下一次同步任务 sync_time_task_ = reactor_.AddTimingTask(new TimeSyncTask(this, kTimeSyncTaskTiming, 60 * 1000)); } @@ -339,7 +343,7 @@ void RateLimitConnection::OnInitResponse(const metric::v2::RateLimitInitResponse limit_target_map_.erase(target_key); window->OnInitResponse(response, time_diff_); - connector_.UpdateCallResult(cluster_, instance_id_, delay, kServerCodeReturnOk); + connector_.UpdateCallResult(cluster_, instance_, delay, kServerCodeReturnOk); reactor_.AddTimingTask(new WindowSyncTask( window, &connector_, window->GetRateLimitRule()->GetRateLimitReport().IntervalWithJitter())); @@ -377,7 +381,7 @@ void RateLimitConnection::OnReportResponse(const metric::v2::RateLimitReportResp } reactor_.CancelTimingTask(task_it->second.task_iter_); // 取消超时检查 task_it->second.task_iter_ = reactor_.TimingTaskEnd(); - connector_.UpdateCallResult(cluster_, instance_id_, delay, kServerCodeReturnOk); + connector_.UpdateCallResult(cluster_, instance_, delay, kServerCodeReturnOk); uint64_t report_time = window->OnReportResponse(response, time_diff_); report_time = report_time > delay ? report_time - delay : 0; reactor_.AddTimingTask(new WindowSyncTask(window, &connector_, report_time)); @@ -405,7 +409,7 @@ void RateLimitConnection::OnResponseTimeout(RateLimitWindow* window, WindowSyncT init_task_map_[window] = reactor_.TimingTaskEnd(); } reactor_.SubmitTask(new WindowSyncTask(window, &connector_)); - connector_.UpdateCallResult(cluster_, instance_id_, request_timeout_, kServerCodeRpcTimeout); + connector_.UpdateCallResult(cluster_, instance_, request_timeout_, kServerCodeRpcTimeout); // 连接上的请求全部超时,切换连接 if (last_response_time_ + request_timeout_ < Time::GetCurrentTimeMs()) { this->CloseForError(); @@ -418,7 +422,7 @@ void RateLimitConnection::CloseForError() { } is_closing_ = true; uint64_t delay = Time::GetCurrentTimeMs() - last_response_time_; - connector_.UpdateCallResult(cluster_, instance_id_, delay, kServerCodeServerError); + connector_.UpdateCallResult(cluster_, instance_, delay, kServerCodeServerError); TimingTaskIter timeout_end = reactor_.TimingTaskEnd(); std::map::iterator task_it; if (stream_ == NULL) { // 连接失败的情况 @@ -561,25 +565,18 @@ ReturnCode RateLimitConnector::SelectConnection(const ServiceKey& metric_cluster std::map::iterator it = connection_mgr_.find(id); if (it != connection_mgr_.end()) { // 连接已存在,直接返回 connection = it->second; + delete instance; } else { - connection = new RateLimitConnection(*this, message_timeout_, *instance, cluster, id); + connection = new RateLimitConnection(*this, message_timeout_, instance, cluster, id); connection_mgr_[connection->GetId()] = connection; } - delete instance; return kReturnOk; } -void RateLimitConnector::UpdateCallResult(const ServiceKey& service_key, - const std::string& instance_id, uint64_t delay, - PolarisServerCode server_code) { - InstanceGauge instance_gauge; - instance_gauge.service_namespace = service_key.namespace_; - instance_gauge.service_name = service_key.name_; - instance_gauge.instance_id = instance_id; - instance_gauge.call_daley = delay; - instance_gauge.call_ret_code = server_code; - instance_gauge.call_ret_status = kCallRetOk; // 该服务不需要熔断 - ConsumerApiImpl::UpdateServiceCallResult(context_, instance_gauge); +void RateLimitConnector::UpdateCallResult(const ServiceKey& cluster, Instance* instance, + uint64_t delay, PolarisServerCode server_code) { + CallRetStatus status = kCallRetOk; // 该服务不需要熔断 + ConsumerApiImpl::UpdateServerResult(context_, cluster, *instance, server_code, status, delay); } } // namespace polaris diff --git a/polaris/quota/rate_limit_connector.h b/polaris/quota/rate_limit_connector.h index 65dfc77..48ad670 100644 --- a/polaris/quota/rate_limit_connector.h +++ b/polaris/quota/rate_limit_connector.h @@ -88,7 +88,7 @@ class RateLimitConnection : public grpc::RequestCallback { public: RateLimitConnection(RateLimitConnector& connector, const uint64_t& request_timeout, - Instance& instance, const ServiceKey& cluster, const std::string& id); + Instance* instance, const ServiceKey& cluster, const std::string& id); virtual ~RateLimitConnection(); // 提供给连接回调对象使用 @@ -135,7 +135,7 @@ class RateLimitConnection : public grpc::RequestCallback