From 90934aa7723f69aa08f9faff00998924003a2c01 Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Mon, 6 Sep 2021 23:04:59 +0800 Subject: [PATCH 1/8] fix discover task state error when switch stream failed --- .../server_connector/server_connector.cpp | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/polaris/plugin/server_connector/server_connector.cpp b/polaris/plugin/server_connector/server_connector.cpp index 75c90a1..e8d8e6a 100644 --- a/polaris/plugin/server_connector/server_connector.cpp +++ b/polaris/plugin/server_connector/server_connector.cpp @@ -366,6 +366,7 @@ bool GrpcServerConnector::SendDiscoverRequest(ServiceListener& service_listener) DataTypeToStr(service_listener.service_.data_type_), service_key.namespace_.c_str(), service_key.name_.c_str()); // 设置超时检查任务 + POLARIS_ASSERT(service_listener.timeout_task_iter_ == reactor_.TimingTaskEnd()); service_listener.timeout_task_iter_ = reactor_.AddTimingTask(new TimingFuncTask( DiscoverTimoutCheck, &service_listener, message_timeout_.GetTimeout())); return true; @@ -519,6 +520,10 @@ ReturnCode GrpcServerConnector::ProcessDiscoverResponse(::v1::DiscoverResponse& // 设置下一次的检查任务 listener.discover_task_iter_ = reactor_.AddTimingTask( new TimingFuncTask(TimingDiscover, &listener, listener.sync_interval_)); + if (pending_for_connected_.count(&listener) > 0) { + // 清理由于切换失败保留在pending列表里的任务需要 + pending_for_connected_.erase(&listener); + } return kReturnOk; } @@ -580,6 +585,16 @@ void GrpcServerConnector::ServerSwitch() { reactor_.CancelTimingTask(server_switch_task_iter_); } + // 有超时检查的服务,说明本次未完成服务发现,加入pending列表,从而可在切换成功后立马发送 + for (std::map::iterator it = listener_map_.begin(); + it != listener_map_.end(); ++it) { + if (it->second.timeout_task_iter_ != reactor_.TimingTaskEnd()) { + reactor_.CancelTimingTask(it->second.timeout_task_iter_); + pending_for_connected_.insert(&it->second); + it->second.timeout_task_iter_ = reactor_.TimingTaskEnd(); + } + } + // 选择一个服务器 std::string host; int port = 0; @@ -618,16 +633,6 @@ void GrpcServerConnector::ServerSwitch() { POLARIS_LOG(LOG_INFO, "discover stream switch to inner server[%s:%d]", host.c_str(), port); } - // 有超时检查的服务,说明本次未完成服务发现,加入pending列表,从而可在切换成功后立马发送 - for (std::map::iterator it = listener_map_.begin(); - it != listener_map_.end(); ++it) { - if (it->second.timeout_task_iter_ != reactor_.TimingTaskEnd()) { - reactor_.CancelTimingTask(it->second.timeout_task_iter_); - pending_for_connected_.insert(&it->second); - it->second.timeout_task_iter_ = reactor_.TimingTaskEnd(); - } - } - // 设置定时任务进行超时检查 server_switch_state_ = kServerSwitchBegin; server_switch_task_iter_ = reactor_.AddTimingTask(new TimingFuncTask( From 1ec44341b1a30fd5998e34d88be38fcc65d5cece Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Wed, 8 Sep 2021 22:57:51 +0800 Subject: [PATCH 2/8] switch to seed service when select discover instance failed --- .../server_connector/server_connector.cpp | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/polaris/plugin/server_connector/server_connector.cpp b/polaris/plugin/server_connector/server_connector.cpp index e8d8e6a..5b4dca9 100644 --- a/polaris/plugin/server_connector/server_connector.cpp +++ b/polaris/plugin/server_connector/server_connector.cpp @@ -550,7 +550,7 @@ void GrpcServerConnector::TimingServerSwitch(GrpcServerConnector* server_connect server_connector->UpdateCallResult(kServerCodeReturnOk, 0); } else if (server_connector->server_switch_state_ == kServerSwitchDefault) { server_connector->server_switch_state_ = kServerSwitchPeriodic; - POLARIS_LOG(LOG_INFO, "switch from default server[%s] to seed service", + POLARIS_LOG(LOG_INFO, "switch from seed server[%s] to seed service", server_connector->grpc_client_->CurrentServer().c_str()); server_connector->UpdateCallResult(kServerCodeReturnOk, 0); } else if (server_connector->server_switch_state_ == kServerSwitchBegin) { @@ -614,23 +614,21 @@ void GrpcServerConnector::ServerSwitch() { discover_stream_state_ = kDiscoverStreamInit; POLARIS_LOG(LOG_INFO, "discover stream switch to discover server[%s:%d]", host.c_str(), port); } else { - POLARIS_LOG(LOG_ERROR, "discover polaris service[%s/%s] return error[%s]", + POLARIS_LOG(LOG_WARN, "discover polaris service[%s/%s] return [%s], switch to seed server", discover_service.namespace_.c_str(), discover_service.name_.c_str(), ReturnCodeToMsg(ret_code).c_str()); - server_switch_state_ = kServerSwitchNormal; // 设置成正常状态,稍后重试 - server_switch_task_iter_ = reactor_.AddTimingTask(new TimingFuncTask( - GrpcServerConnector::TimingServerSwitch, this, message_timeout_.GetTimeout())); - return; } - } else { - SeedServer& server = server_lists_[rand() % server_lists_.size()]; - host = server.ip_; - port = server.port_; + } + if (host.empty()) { + discover_stream_state_ = kDiscoverStreamNotInit; + SeedServer& server = server_lists_[rand() % server_lists_.size()]; + host = server.ip_; + port = server.port_; // 如果没有配置discover服务名,那么则此时已经初始化完成 if (context_->GetContextImpl()->GetDiscoverService().service_.name_.empty()) { discover_stream_state_ = kDiscoverStreamInit; } - POLARIS_LOG(LOG_INFO, "discover stream switch to inner server[%s:%d]", host.c_str(), port); + POLARIS_LOG(LOG_INFO, "discover stream switch to seed server[%s:%d]", host.c_str(), port); } // 设置定时任务进行超时检查 From a5f096bfb213368c2bc3fb0ec14bff5841e3a30e Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Wed, 8 Sep 2021 13:22:22 +0800 Subject: [PATCH 3/8] fix service instances expired failed --- include/polaris/plugin.h | 4 +++ polaris/cache/rcu_map.h | 16 +++++---- .../plugin/health_checker/health_checker.cpp | 35 ++++++++----------- .../plugin/local_registry/local_registry.cpp | 35 +++++++++++++++++++ .../plugin/local_registry/local_registry.h | 4 +++ test/mock/mock_local_registry.h | 4 +++ 6 files changed, 72 insertions(+), 26 deletions(-) diff --git a/include/polaris/plugin.h b/include/polaris/plugin.h index 78dda6b..2bc0eb9 100644 --- a/include/polaris/plugin.h +++ b/include/polaris/plugin.h @@ -285,6 +285,10 @@ class LocalRegistry : public Plugin { virtual ReturnCode UpdateSetCircuitBreakerData( const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets) = 0; + virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key, + ServiceData*& service_data, + std::vector& open_instances) = 0; + /// @brief 更新服务实例状态,properties存放的是状态值,当前支持2个key /// /// 1. ReadyToServe: 故障熔断标识,true or false diff --git a/polaris/cache/rcu_map.h b/polaris/cache/rcu_map.h index e88fe7b..6a073e8 100644 --- a/polaris/cache/rcu_map.h +++ b/polaris/cache/rcu_map.h @@ -78,7 +78,7 @@ class RcuMap { ~RcuMap(); /// @brief 根据Key获取指向Value的指针,key不存在返回NULL - Value* Get(const Key& key); + Value* Get(const Key& key, bool update_access_time = true); /// @brief 更新Key对应的Value /// 如果key对应的value已存在,则将旧的value加入待释放列表,内部线程会延迟一定时间释放 @@ -166,20 +166,24 @@ RcuMap::~RcuMap() { } template -Value* RcuMap::Get(const Key& key) { +Value* RcuMap::Get(const Key& key, bool update_access_time) { // 查询read map,获取结果 Value* read_result = NULL; InnerMap* current_read = read_map_; typename InnerMap::iterator it = current_read->find(key); if (it != current_read->end()) { // MapValue包含的value指针在整个过程中是可能改变的 - it->second->used_time_ = Time::GetCurrentTimeMs(); - read_result = it->second->value_; + if (update_access_time) { + it->second->used_time_ = Time::GetCurrentTimeMs(); + } + read_result = it->second->value_; } else { // 从read map未读到数据,则加锁进行后续操作 sync::MutexGuard mutex_guard(dirty_lock_); if ((it = dirty_map_->find(key)) != dirty_map_->end()) { - it->second->used_time_ = Time::GetCurrentTimeMs(); - read_result = it->second->value_; + if (update_access_time) { + it->second->used_time_ = Time::GetCurrentTimeMs(); + } + read_result = it->second->value_; if (read_map_ == current_read) { miss_time_++; // 记录read map读失败,dirty map读成功次数 } diff --git a/polaris/plugin/health_checker/health_checker.cpp b/polaris/plugin/health_checker/health_checker.cpp index a4eedf0..43775f2 100644 --- a/polaris/plugin/health_checker/health_checker.cpp +++ b/polaris/plugin/health_checker/health_checker.cpp @@ -138,38 +138,32 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b } ServiceData* service_data = NULL; - local_registry_->GetServiceDataWithRef(service_key_, kServiceDataInstances, service_data); - if (service_data == NULL) { + std::vector health_check_instances; + if (local_registry_->GetCircuitBreakerInstances(service_key_, service_data, + health_check_instances) != kReturnOk) { return kReturnOk; } - Service* service = service_data->GetService(); + ServiceInstances service_instances(service_data); std::map& instance_map = service_instances.GetInstances(); - std::set target_health_check_instances; if (when_ == HealthCheckerConfig::kChainWhenAlways) { + health_check_instances.clear(); // 健康检查设置为always, 则探测所有非隔离实例 for (std::map::iterator instance_iter = instance_map.begin(); instance_iter != instance_map.end(); ++instance_iter) { if (!instance_iter->second->isIsolate()) { - target_health_check_instances.insert(instance_iter->first); + health_check_instances.push_back(instance_iter->second); } } - } else if (when_ == HealthCheckerConfig::kChainWhenOnRecover) { - // 健康检查设置为on_recover, 则探测半开实例 - target_health_check_instances = service->GetCircuitBreakerOpenInstances(); + } else if (when_ != HealthCheckerConfig::kChainWhenOnRecover) { + // 健康检查设置不为on_recover, 则探测半开实例 + health_check_instances.clear(); } - for (std::set::iterator it = target_health_check_instances.begin(); - it != target_health_check_instances.end(); ++it) { - const std::string& instance_id = *it; - std::map::iterator iter = instance_map.find(instance_id); - if (iter == instance_map.end()) { - POLARIS_LOG(LOG_INFO, "The health checker of service[%s/%s] getting instance[%s] failed", - service_key_.namespace_.c_str(), service_key_.name_.c_str(), instance_id.c_str()); - continue; - } + + for (std::size_t i = 0; i < health_check_instances.size(); ++i) { bool is_detect_success = false; - Instance* instance = iter->second; + Instance* instance = health_check_instances[i]; for (std::size_t i = 0; i < health_checker_list_.size(); ++i) { HealthChecker*& detector = health_checker_list_[i]; DetectResult detector_result; @@ -195,7 +189,7 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b // 探活插件成功,则将熔断实例置为半开状态,其他实例状态不变 // 探活插件失败,则将健康实例置为熔断状态,其他实例状态不变 if (is_detect_success) { - circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerOpen, + circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerOpen, kCircuitBreakerHalfOpen); POLARIS_LOG(LOG_INFO, "service[%s/%s] getting instance[%s-%s:%d] detectoring success, change to " @@ -203,7 +197,8 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b service_key_.namespace_.c_str(), service_key_.name_.c_str(), instance->GetId().c_str(), instance->GetHost().c_str(), instance->GetPort()); } else { - circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerClose, kCircuitBreakerOpen); + circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerClose, + kCircuitBreakerOpen); POLARIS_LOG(LOG_INFO, "service[%s/%s] getting instance[%s-%s:%d] detectoring failed, change to " "open status", diff --git a/polaris/plugin/local_registry/local_registry.cpp b/polaris/plugin/local_registry/local_registry.cpp index 6e83c2e..f3de243 100644 --- a/polaris/plugin/local_registry/local_registry.cpp +++ b/polaris/plugin/local_registry/local_registry.cpp @@ -175,6 +175,7 @@ void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time, pthread_rwlock_unlock(¬ify_rwlock_); if (service_data_type == kServiceDataInstances) { // 清除实例数据时对应的服务级别插件也删除 context_impl->DeleteServiceContext(expired_services[i]); + DeleteServiceInLock(expired_services[i]); } } } @@ -308,6 +309,7 @@ ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key, } if (service_data == NULL) { // Server Connector反注册Handler触发更新为NULL if (data_type == kServiceDataInstances) { // 删除服务实例数据时,同时删除服务 + context_impl->DeleteServiceContext(service_key); DeleteServiceInLock(service_key); } context_impl->GetServiceRecord()->ServiceDataDelete(service_key, @@ -389,4 +391,37 @@ ReturnCode InMemoryRegistry::UpdateSetCircuitBreakerData( return service->WriteCircuitBreakerUnhealthySets(unhealthy_sets); } +ReturnCode InMemoryRegistry::GetCircuitBreakerInstances(const ServiceKey& service_key, + ServiceData*& service_data, + std::vector& open_instances) { + service_data = service_instances_data_.Get(service_key, false); + if (service_data == NULL) { + return kReturnServiceNotFound; + } + if (service_data->GetDataStatus() < kDataIsSyncing) { + service_data->DecrementRef(); + return kReturnServiceNotFound; + } + Service* service = service_data->GetService(); + ServiceInstances service_instances(service_data); + std::map& instance_map = service_instances.GetInstances(); + std::set circuit_breaker_open_instance = service->GetCircuitBreakerOpenInstances(); + for (std::set::iterator it = circuit_breaker_open_instance.begin(); + it != circuit_breaker_open_instance.end(); ++it) { + const std::string& instance_id = *it; + std::map::iterator iter = instance_map.find(instance_id); + if (iter == instance_map.end()) { + POLARIS_LOG(LOG_INFO, "The outlier detector of service[%s/%s] getting instance[%s] failed", + service_key.namespace_.c_str(), service_key.name_.c_str(), instance_id.c_str()); + continue; + } + open_instances.push_back(iter->second); + } + if (open_instances.empty()) { + return kReturnInstanceNotFound; + } + service_data->IncrementRef(); + return kReturnOk; +} + } // namespace polaris diff --git a/polaris/plugin/local_registry/local_registry.h b/polaris/plugin/local_registry/local_registry.h index c342898..e457bcc 100644 --- a/polaris/plugin/local_registry/local_registry.h +++ b/polaris/plugin/local_registry/local_registry.h @@ -95,6 +95,10 @@ class InMemoryRegistry : public LocalRegistry { virtual ReturnCode UpdateSetCircuitBreakerData( const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets); + virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key, + ServiceData*& service_data, + std::vector& open_instances); + virtual ReturnCode UpdateDynamicWeight(const ServiceKey& service_key, const DynamicWeightData& dynamic_weight_data); diff --git a/test/mock/mock_local_registry.h b/test/mock/mock_local_registry.h index 502609f..de371a0 100644 --- a/test/mock/mock_local_registry.h +++ b/test/mock/mock_local_registry.h @@ -59,6 +59,10 @@ class MockLocalRegistry : public LocalRegistry { ReturnCode(const ServiceKey &service_key, const CircuitBreakUnhealthySetsData &cb_unhealthy_set_data)); + MOCK_METHOD3(GetCircuitBreakerInstances, + ReturnCode(const ServiceKey &service_key, ServiceData *&service_data, + std::vector &open_instances)); + MOCK_METHOD2(UpdateDynamicWeight, ReturnCode(const ServiceKey &service_key, const DynamicWeightData &dynamic_weight_data)); From 18e1c9469b1ffd94e9c89f6e457c9596e484407c Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Fri, 10 Sep 2021 14:55:28 +0800 Subject: [PATCH 4/8] disable ticker thread if set custom time function --- polaris/quota/quota_manager.cpp | 2 +- polaris/utils/time_clock.cpp | 4 +--- polaris/utils/time_clock.h | 8 ++------ test/plugin/local_registry/local_registry_test.cpp | 2 ++ test/quota/quota_manager_test.cpp | 7 +------ test/utils/time_clock_test.cpp | 8 ++++++++ 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/polaris/quota/quota_manager.cpp b/polaris/quota/quota_manager.cpp index bfeb717..159dacb 100644 --- a/polaris/quota/quota_manager.cpp +++ b/polaris/quota/quota_manager.cpp @@ -78,7 +78,7 @@ ReturnCode QuotaManager::Init(Context* context, Config* config) { context_ = context; ContextMode context_mode = context->GetContextMode(); - if (context_mode != kLimitContext && context_mode != kShareContext) { + if (context_mode == kPrivateContext) { // provider或consumer私有时不创建quota manger线程 return kReturnOk; } bool is_enable = config->GetBoolOrDefault(kRateLimitEnableKey, kRateLimitEnableDefault); diff --git a/polaris/utils/time_clock.cpp b/polaris/utils/time_clock.cpp index 487884b..e2c2c25 100644 --- a/polaris/utils/time_clock.cpp +++ b/polaris/utils/time_clock.cpp @@ -14,8 +14,6 @@ #include "utils/time_clock.h" #include -#include -#include #include #include #include @@ -112,7 +110,7 @@ void Time::TrySetUpClock() { #if !defined(POLARIS_DISABLE_TIME_TICKER) // 不使用自定义时钟 pthread_mutex_lock(&g_custom_clock_lock); g_custom_clock_ref_count++; - if (g_custom_clock_update_tid == 0) { // 创建更新线程 + if (g_custom_clock_update_tid == 0 && current_time_impl == clock_real_time) { // 创建更新线程 pthread_atfork(ForkPrepare, NULL, ForkChild); // 注册Fork事件回调 // 这里必须先初始化自定义时钟,线程启动后会立即替换 timespec clock_time; diff --git a/polaris/utils/time_clock.h b/polaris/utils/time_clock.h index 3556126..84521d2 100644 --- a/polaris/utils/time_clock.h +++ b/polaris/utils/time_clock.h @@ -17,12 +17,8 @@ #include #include -namespace google { -namespace protobuf { -class Duration; -class Timestamp; -} // namespace protobuf -} // namespace google +#include +#include namespace polaris { diff --git a/test/plugin/local_registry/local_registry_test.cpp b/test/plugin/local_registry/local_registry_test.cpp index 2cab09d..3d21324 100644 --- a/test/plugin/local_registry/local_registry_test.cpp +++ b/test/plugin/local_registry/local_registry_test.cpp @@ -372,6 +372,7 @@ TEST_F(InMemoryLocalRegistryTest, TestServiceExpire) { TestUtils::FakeNowIncrement(1); local_registry_->RemoveExpireServiceData(Time::GetCurrentTimeMs()); ASSERT_TRUE(mock_server_connector_->saved_handler_ == NULL); + TestUtils::TearDownFakeTime(); } TEST_F(InMemoryLocalRegistryTest, TestOldServiceDataGc) { @@ -422,6 +423,7 @@ TEST_F(InMemoryLocalRegistryTest, TestOldServiceDataGc) { // 旧服务虽然被缓存删除并被释放,但更新前在使用,所以还需要再次释放 ASSERT_EQ(init_service_data->DecrementAndGetRef(), 1); delete mock_server_connector_->saved_handler_; + TestUtils::TearDownFakeTime(); } } // namespace polaris diff --git a/test/quota/quota_manager_test.cpp b/test/quota/quota_manager_test.cpp index bcbe4c7..e55b31c 100644 --- a/test/quota/quota_manager_test.cpp +++ b/test/quota/quota_manager_test.cpp @@ -28,8 +28,7 @@ namespace polaris { class QuotaManagerTest : public ::testing::Test { virtual void SetUp() { - context_ = TestContext::CreateContext(kLimitContext); - TestUtils::SetUpFakeTime(); + context_ = TestContext::CreateContext(); ASSERT_TRUE(context_ != NULL); service_key_.namespace_ = "test_namespace"; service_key_.name_ = "test_name"; @@ -45,7 +44,6 @@ class QuotaManagerTest : public ::testing::Test { delete quota_manager_; quota_manager_ = NULL; } - TestUtils::TearDownFakeTime(); } protected: @@ -143,9 +141,6 @@ TEST_F(QuotaManagerTest, GetQuotaWithRule) { } TEST_F(QuotaManagerTest, PrepareQuotaInfo) { - delete context_; - context_ = TestContext::CreateContext(); - ASSERT_TRUE(context_ != NULL); CreateQuotaManager(true); MockLocalRegistry *mock_local_registry = TestContext::SetupMockLocalRegistry(context_); diff --git a/test/utils/time_clock_test.cpp b/test/utils/time_clock_test.cpp index 04f00b9..7f313e2 100644 --- a/test/utils/time_clock_test.cpp +++ b/test/utils/time_clock_test.cpp @@ -71,4 +71,12 @@ TEST_F(TimeClockTest, MultiThreadTest) { thread_list.clear(); } +TEST_F(TimeClockTest, CustomClockFunc) { + TestUtils::SetUpFakeTime(); // 设置自定义时间函数 + Time::TrySetUpClock(); + ASSERT_EQ(g_custom_clock_update_tid, 0); // 自定义时间函数时不会启动内部时间线程 + Time::TryShutdomClock(); + TestUtils::TearDownFakeTime(); +} + } // namespace polaris From e43da1b444744e4008f7c0c706a32d2f77a1d005 Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Wed, 22 Sep 2021 17:11:35 +0800 Subject: [PATCH 5/8] fix server connector send discover rquest --- .../server_connector/server_connector.cpp | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/polaris/plugin/server_connector/server_connector.cpp b/polaris/plugin/server_connector/server_connector.cpp index 5b4dca9..7e16c2f 100644 --- a/polaris/plugin/server_connector/server_connector.cpp +++ b/polaris/plugin/server_connector/server_connector.cpp @@ -343,6 +343,13 @@ bool GrpcServerConnector::SendDiscoverRequest(ServiceListener& service_listener) return false; } } + // 已经发送过请求正等待超时 + if (service_listener.timeout_task_iter_ != reactor_.TimingTaskEnd()) { + POLARIS_LOG(LOG_WARN, "already discover %s for service[%s/%s]", + DataTypeToStr(service_listener.service_.data_type_), service_key.namespace_.c_str(), + service_key.name_.c_str()); + return true; + } // 组装请求 ::v1::DiscoverRequest request; if (!service_key.namespace_.empty()) { @@ -366,7 +373,6 @@ bool GrpcServerConnector::SendDiscoverRequest(ServiceListener& service_listener) DataTypeToStr(service_listener.service_.data_type_), service_key.namespace_.c_str(), service_key.name_.c_str()); // 设置超时检查任务 - POLARIS_ASSERT(service_listener.timeout_task_iter_ == reactor_.TimingTaskEnd()); service_listener.timeout_task_iter_ = reactor_.AddTimingTask(new TimingFuncTask( DiscoverTimoutCheck, &service_listener, message_timeout_.GetTimeout())); return true; @@ -518,11 +524,14 @@ ReturnCode GrpcServerConnector::ProcessDiscoverResponse(::v1::DiscoverResponse& response.info().value().c_str()); } // 设置下一次的检查任务 - listener.discover_task_iter_ = reactor_.AddTimingTask( - new TimingFuncTask(TimingDiscover, &listener, listener.sync_interval_)); - if (pending_for_connected_.count(&listener) > 0) { - // 清理由于切换失败保留在pending列表里的任务需要 - pending_for_connected_.erase(&listener); + // 这里检查,避免发现任务取消又注册后,之前的同步任务遗留的应答导致重复设置定时任务 + if (listener.discover_task_iter_ == reactor_.TimingTaskEnd()) { + listener.discover_task_iter_ = reactor_.AddTimingTask( + new TimingFuncTask(TimingDiscover, &listener, listener.sync_interval_)); + if (pending_for_connected_.count(&listener) > 0) { + // 清理由于切换失败保留在pending列表里的任务需要 + pending_for_connected_.erase(&listener); + } } return kReturnOk; } From bbfe9a66756fa165e7072b821377763207bfea19 Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Wed, 22 Sep 2021 20:26:40 +0800 Subject: [PATCH 6/8] access service in lock when outlier detector get open instance --- .../plugin/local_registry/local_registry.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/polaris/plugin/local_registry/local_registry.cpp b/polaris/plugin/local_registry/local_registry.cpp index f3de243..9925912 100644 --- a/polaris/plugin/local_registry/local_registry.cpp +++ b/polaris/plugin/local_registry/local_registry.cpp @@ -402,12 +402,20 @@ ReturnCode InMemoryRegistry::GetCircuitBreakerInstances(const ServiceKey& servic service_data->DecrementRef(); return kReturnServiceNotFound; } - Service* service = service_data->GetService(); + // 由于此处获取service data没有更新访问时间,服务可能淘汰,不能直接使用其关联的服务数据 + pthread_rwlock_rdlock(&rwlock_); + std::map::iterator service_it = service_cache_.find(service_key); + if (service_it == service_cache_.end()) { + pthread_rwlock_unlock(&rwlock_); + return kReturnServiceNotFound; + } + std::set open_instance = service_it->second->GetCircuitBreakerOpenInstances(); + pthread_rwlock_unlock(&rwlock_); + ServiceInstances service_instances(service_data); - std::map& instance_map = service_instances.GetInstances(); - std::set circuit_breaker_open_instance = service->GetCircuitBreakerOpenInstances(); - for (std::set::iterator it = circuit_breaker_open_instance.begin(); - it != circuit_breaker_open_instance.end(); ++it) { + std::map& instance_map = service_instances.GetInstances(); + for (std::set::iterator it = open_instance.begin(); it != open_instance.end(); + ++it) { const std::string& instance_id = *it; std::map::iterator iter = instance_map.find(instance_id); if (iter == instance_map.end()) { From bc6607dd41aa984ec6929bf14504cc3a0e74b68c Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Thu, 23 Sep 2021 19:45:59 +0800 Subject: [PATCH 7/8] free service context in rcu map --- polaris/context.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/polaris/context.cpp b/polaris/context.cpp index 0bad2f3..1635643 100644 --- a/polaris/context.cpp +++ b/polaris/context.cpp @@ -797,6 +797,7 @@ void ContextImpl::ClearCache() { } pthread_rwlock_unlock(&cache_rwlock_); } + service_context_map_->CheckGc(min_access_time); } } // namespace polaris From 546aef710c30423987d8c7016e7a1ea5d27e344a Mon Sep 17 00:00:00 2001 From: lambdaliu Date: Mon, 27 Sep 2021 19:36:02 +0800 Subject: [PATCH 8/8] fix free service after service instance data expired --- .../plugin/local_registry/local_registry.cpp | 70 +++++++++++-------- .../plugin/local_registry/local_registry.h | 4 +- .../set_circuit_breaker_test.cpp | 25 ++++--- test/plugin/load_balancer/ring_hash_test.cpp | 4 ++ .../local_registry/local_registry_test.cpp | 2 +- 5 files changed, 59 insertions(+), 46 deletions(-) diff --git a/polaris/plugin/local_registry/local_registry.cpp b/polaris/plugin/local_registry/local_registry.cpp index 9925912..50658fb 100644 --- a/polaris/plugin/local_registry/local_registry.cpp +++ b/polaris/plugin/local_registry/local_registry.cpp @@ -138,20 +138,38 @@ void InMemoryRegistry::RunGcTask() { service_circuit_breaker_config_data_.CheckGc(min_gc_time); } -Service* InMemoryRegistry::GetOrCreateServiceInLock(const ServiceKey& service_key) { +Service* InMemoryRegistry::CreateServiceInLock(const ServiceKey& service_key) { Service* service = NULL; pthread_rwlock_wrlock(&rwlock_); std::map::iterator service_it = service_cache_.find(service_key); - if (service_it == service_cache_.end()) { - service = new Service(service_key, ++next_service_id_); - service_cache_[service_key] = service; - } else { + POLARIS_ASSERT(service_it == service_cache_.end()) + service = new Service(service_key, ++next_service_id_); + service_cache_[service_key] = service; + pthread_rwlock_unlock(&rwlock_); + return service; +} + +Service* InMemoryRegistry::GetServiceInLock(const ServiceKey& service_key) { + Service* service = NULL; + pthread_rwlock_wrlock(&rwlock_); + std::map::iterator service_it = service_cache_.find(service_key); + if (service_it != service_cache_.end()) { service = service_it->second; } pthread_rwlock_unlock(&rwlock_); return service; } +void InMemoryRegistry::DeleteServiceInLock(const ServiceKey& service_key) { + pthread_rwlock_wrlock(&rwlock_); + std::map::iterator service_it = service_cache_.find(service_key); + if (service_it != service_cache_.end()) { + delete service_it->second; + service_cache_.erase(service_it); + } + pthread_rwlock_unlock(&rwlock_); +} + void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time, RcuMap& rcu_cache, ServiceDataType service_data_type) { @@ -166,17 +184,16 @@ void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time, if (service_data_notify_map_.erase(service_key_with_type) > 0) { // 有通知对象表示注册过handler context_->GetServerConnector()->DeregisterEventHandler(expired_services[i], service_data_type); - } else { // 没有通知对象,表示未注册过handler,从磁盘加载后从未访问过的数据,直接删除数据 - rcu_cache.Delete(expired_services[i]); - context_impl->GetServiceRecord()->ServiceDataDelete(expired_services[i], service_data_type); - context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(expired_services[i], - service_data_type, ""); } - pthread_rwlock_unlock(¬ify_rwlock_); if (service_data_type == kServiceDataInstances) { // 清除实例数据时对应的服务级别插件也删除 context_impl->DeleteServiceContext(expired_services[i]); DeleteServiceInLock(expired_services[i]); } + rcu_cache.Delete(expired_services[i]); + context_impl->GetServiceRecord()->ServiceDataDelete(expired_services[i], service_data_type); + context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(expired_services[i], + service_data_type, ""); + pthread_rwlock_unlock(¬ify_rwlock_); } } @@ -249,6 +266,9 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service if (interval_it != service_interval_map_.end()) { refresh_interval = interval_it->second; } + if (data_type == kServiceDataInstances) { + CreateServiceInLock(service_key); + } // 先加载磁盘缓存数据 CachePersist& cache_persist = context_->GetContextImpl()->GetCacheManager()->GetCachePersist(); ServiceData* disk_service_data = cache_persist.LoadServiceData(service_key, data_type); @@ -269,25 +289,21 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service return kReturnOk; } -void InMemoryRegistry::DeleteServiceInLock(const ServiceKey& service_key) { - pthread_rwlock_wrlock(&rwlock_); - std::map::iterator service_it = service_cache_.find(service_key); - if (service_it != service_cache_.end()) { - delete service_it->second; - service_cache_.erase(service_it); - } - pthread_rwlock_unlock(&rwlock_); -} - ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key, ServiceDataType data_type, ServiceData* service_data) { - if (service_data != NULL) { // 更新服务数据指向服务 - Service* service = GetOrCreateServiceInLock(service_key); + Service* service = GetServiceInLock(service_key); + if (service != NULL) { // 更新服务数据指向服务 service->UpdateData(service_data); } ContextImpl* context_impl = context_->GetContextImpl(); if (data_type == kServiceDataInstances) { + if (service == NULL) { // 服务被反注册了 + if (service_data != NULL) { + service_data->DecrementRef(); + } + return kReturnOk; + } ServiceData* old_service_data = service_instances_data_.Get(service_key); if (old_service_data != NULL) { PluginManager::Instance().OnPreUpdateServiceData(old_service_data, service_data); @@ -308,14 +324,6 @@ ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key, POLARIS_ASSERT(false); } if (service_data == NULL) { // Server Connector反注册Handler触发更新为NULL - if (data_type == kServiceDataInstances) { // 删除服务实例数据时,同时删除服务 - context_impl->DeleteServiceContext(service_key); - DeleteServiceInLock(service_key); - } - context_impl->GetServiceRecord()->ServiceDataDelete(service_key, - data_type); // 同步记录Service数据删除 - context_impl->GetCacheManager()->GetCachePersist().PersistServiceData( - service_key, data_type, ""); // 异步删除磁盘服务数据 return kReturnOk; } context_impl->GetServiceRecord()->ServiceDataUpdate(service_data); // 同步记录Service版本变化 diff --git a/polaris/plugin/local_registry/local_registry.h b/polaris/plugin/local_registry/local_registry.h index e457bcc..b6f4837 100644 --- a/polaris/plugin/local_registry/local_registry.h +++ b/polaris/plugin/local_registry/local_registry.h @@ -109,7 +109,9 @@ class InMemoryRegistry : public LocalRegistry { ServiceDataNotify* GetOrCreateDataNotify(const ServiceKey& service_key, ServiceDataType data_type, bool& new_create); - Service* GetOrCreateServiceInLock(const ServiceKey& service_key); + Service* CreateServiceInLock(const ServiceKey& service_key); + + Service* GetServiceInLock(const ServiceKey& service_key); void DeleteServiceInLock(const ServiceKey& service_key); diff --git a/test/integration/set_circuit_breaker/set_circuit_breaker_test.cpp b/test/integration/set_circuit_breaker/set_circuit_breaker_test.cpp index 5305fc1..ed1cb63 100644 --- a/test/integration/set_circuit_breaker/set_circuit_breaker_test.cpp +++ b/test/integration/set_circuit_breaker/set_circuit_breaker_test.cpp @@ -68,6 +68,7 @@ class SetCircuitBreakerTest : public IntegrationBase { ASSERT_TRUE(consumer_ != NULL); SetUpServiceData(); + TryDoRoute(); } virtual void TearDown() { @@ -358,6 +359,17 @@ class SetCircuitBreakerTest : public IntegrationBase { << total << " " << percent << " " << err_rate << " " << count; } + void TryDoRoute() { + polaris::GetOneInstanceRequest req(service_key_); + polaris::ServiceInfo service_info; + service_info.service_key_.name_ = "test2"; + service_info.service_key_.namespace_ = "Test"; + service_info.metadata_["f"] = "fv1"; + req.SetSourceService(service_info); + polaris::Instance instance; + ASSERT_EQ(consumer_->GetOneInstance(req, instance), kReturnOk); + } + protected: polaris::ServiceKey service_key_; polaris::ConsumerApi* consumer_; @@ -391,19 +403,6 @@ class SetCircuitBreakerTest : public IntegrationBase { std::string ins3_id_; }; -TEST_F(SetCircuitBreakerTest, TestRoute) { - ReturnCode ret; - polaris::GetOneInstanceRequest req(service_key_); - polaris::ServiceInfo service_info; - service_info.service_key_.name_ = "test2"; - service_info.service_key_.namespace_ = "Test"; - service_info.metadata_["f"] = "fv1"; - req.SetSourceService(service_info); - polaris::Instance instance; - ret = consumer_->GetOneInstance(req, instance); - ASSERT_EQ(ret, kReturnOk); -} - TEST_F(SetCircuitBreakerTest, ErrRateOpen) { polaris::ServiceCallResult err_result; err_result.SetServiceNamespace(service_key_.namespace_); diff --git a/test/plugin/load_balancer/ring_hash_test.cpp b/test/plugin/load_balancer/ring_hash_test.cpp index 239fc6f..2e305f1 100644 --- a/test/plugin/load_balancer/ring_hash_test.cpp +++ b/test/plugin/load_balancer/ring_hash_test.cpp @@ -97,6 +97,10 @@ TEST_F(RingHashCstLbTest, TestSelectWithInstancesUpdate) { LocalRegistry *local_registry = context_->GetLocalRegistry(); local_registry->GetServiceDataWithRef(service_key_, kServiceDataInstances, service_data); ASSERT_TRUE(service_data == NULL); + ServiceDataNotify *notify = NULL; + local_registry->LoadServiceDataWithNotify(service_key_, kServiceDataInstances, service_data, + notify); + ASSERT_TRUE(notify != NULL); v1::DiscoverResponse response; CreateInstancesResponse(response); diff --git a/test/plugin/local_registry/local_registry_test.cpp b/test/plugin/local_registry/local_registry_test.cpp index 3d21324..aa14363 100644 --- a/test/plugin/local_registry/local_registry_test.cpp +++ b/test/plugin/local_registry/local_registry_test.cpp @@ -203,7 +203,7 @@ TEST_F(InMemoryLocalRegistryTest, TestUpdateServiceData) { std::set service_key_set; ret = local_registry_->GetAllServiceKey(service_key_set); ASSERT_EQ(ret, kReturnOk); - ASSERT_EQ(service_key_set.size(), 1); + ASSERT_EQ(service_key_set.size(), 0); delete mock_server_connector_->saved_handler_; mock_server_connector_->saved_handler_ = NULL;