Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/polaris/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ class LocalRegistry : public Plugin {
virtual ReturnCode UpdateSetCircuitBreakerData(
const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets) = 0;

virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key,
ServiceData*& service_data,
std::vector<Instance*>& open_instances) = 0;

/// @brief 更新服务实例状态,properties存放的是状态值,当前支持2个key
///
/// 1. ReadyToServe: 故障熔断标识,true or false
Expand Down
16 changes: 10 additions & 6 deletions polaris/cache/rcu_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RcuMap {
~RcuMap();

/// @brief 根据Key获取指向Value的指针,key不存在返回NULL
Value* Get(const Key& key);
Value* Get(const Key& key, bool update_access_time = true);

/// @brief 更新Key对应的Value
/// 如果key对应的value已存在,则将旧的value加入待释放列表,内部线程会延迟一定时间释放
Expand Down Expand Up @@ -166,20 +166,24 @@ RcuMap<Key, Value>::~RcuMap() {
}

template <typename Key, typename Value>
Value* RcuMap<Key, Value>::Get(const Key& key) {
Value* RcuMap<Key, Value>::Get(const Key& key, bool update_access_time) {
// 查询read map,获取结果
Value* read_result = NULL;
InnerMap* current_read = read_map_;
typename InnerMap::iterator it = current_read->find(key);
if (it != current_read->end()) { // MapValue包含的value指针在整个过程中是可能改变的
it->second->used_time_ = Time::GetCurrentTimeMs();
read_result = it->second->value_;
if (update_access_time) {
it->second->used_time_ = Time::GetCurrentTimeMs();
}
read_result = it->second->value_;
} else {
// 从read map未读到数据,则加锁进行后续操作
sync::MutexGuard mutex_guard(dirty_lock_);
if ((it = dirty_map_->find(key)) != dirty_map_->end()) {
it->second->used_time_ = Time::GetCurrentTimeMs();
read_result = it->second->value_;
if (update_access_time) {
it->second->used_time_ = Time::GetCurrentTimeMs();
}
read_result = it->second->value_;
if (read_map_ == current_read) {
miss_time_++; // 记录read map读失败,dirty map读成功次数
}
Expand Down
1 change: 1 addition & 0 deletions polaris/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ void ContextImpl::ClearCache() {
}
pthread_rwlock_unlock(&cache_rwlock_);
}
service_context_map_->CheckGc(min_access_time);
}

} // namespace polaris
35 changes: 15 additions & 20 deletions polaris/plugin/health_checker/health_checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,38 +138,32 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b
}

ServiceData* service_data = NULL;
local_registry_->GetServiceDataWithRef(service_key_, kServiceDataInstances, service_data);
if (service_data == NULL) {
std::vector<Instance*> health_check_instances;
if (local_registry_->GetCircuitBreakerInstances(service_key_, service_data,
health_check_instances) != kReturnOk) {
return kReturnOk;
}
Service* service = service_data->GetService();

ServiceInstances service_instances(service_data);
std::map<std::string, Instance*>& instance_map = service_instances.GetInstances();
std::set<std::string> target_health_check_instances;

if (when_ == HealthCheckerConfig::kChainWhenAlways) {
health_check_instances.clear();
// 健康检查设置为always, 则探测所有非隔离实例
for (std::map<std::string, Instance*>::iterator instance_iter = instance_map.begin();
instance_iter != instance_map.end(); ++instance_iter) {
if (!instance_iter->second->isIsolate()) {
target_health_check_instances.insert(instance_iter->first);
health_check_instances.push_back(instance_iter->second);
}
}
} else if (when_ == HealthCheckerConfig::kChainWhenOnRecover) {
// 健康检查设置为on_recover, 则探测半开实例
target_health_check_instances = service->GetCircuitBreakerOpenInstances();
} else if (when_ != HealthCheckerConfig::kChainWhenOnRecover) {
// 健康检查设置不为on_recover, 则探测半开实例
health_check_instances.clear();
}
for (std::set<std::string>::iterator it = target_health_check_instances.begin();
it != target_health_check_instances.end(); ++it) {
const std::string& instance_id = *it;
std::map<std::string, Instance*>::iterator iter = instance_map.find(instance_id);
if (iter == instance_map.end()) {
POLARIS_LOG(LOG_INFO, "The health checker of service[%s/%s] getting instance[%s] failed",
service_key_.namespace_.c_str(), service_key_.name_.c_str(), instance_id.c_str());
continue;
}

for (std::size_t i = 0; i < health_check_instances.size(); ++i) {
bool is_detect_success = false;
Instance* instance = iter->second;
Instance* instance = health_check_instances[i];
for (std::size_t i = 0; i < health_checker_list_.size(); ++i) {
HealthChecker*& detector = health_checker_list_[i];
DetectResult detector_result;
Expand All @@ -195,15 +189,16 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b
// 探活插件成功,则将熔断实例置为半开状态,其他实例状态不变
// 探活插件失败,则将健康实例置为熔断状态,其他实例状态不变
if (is_detect_success) {
circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerOpen,
circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerOpen,
kCircuitBreakerHalfOpen);
POLARIS_LOG(LOG_INFO,
"service[%s/%s] getting instance[%s-%s:%d] detectoring success, change to "
"half-open status",
service_key_.namespace_.c_str(), service_key_.name_.c_str(),
instance->GetId().c_str(), instance->GetHost().c_str(), instance->GetPort());
} else {
circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerClose, kCircuitBreakerOpen);
circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerClose,
kCircuitBreakerOpen);
POLARIS_LOG(LOG_INFO,
"service[%s/%s] getting instance[%s-%s:%d] detectoring failed, change to "
"open status",
Expand Down
111 changes: 81 additions & 30 deletions polaris/plugin/local_registry/local_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,38 @@ void InMemoryRegistry::RunGcTask() {
service_circuit_breaker_config_data_.CheckGc(min_gc_time);
}

Service* InMemoryRegistry::GetOrCreateServiceInLock(const ServiceKey& service_key) {
Service* InMemoryRegistry::CreateServiceInLock(const ServiceKey& service_key) {
Service* service = NULL;
pthread_rwlock_wrlock(&rwlock_);
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
if (service_it == service_cache_.end()) {
service = new Service(service_key, ++next_service_id_);
service_cache_[service_key] = service;
} else {
POLARIS_ASSERT(service_it == service_cache_.end())
service = new Service(service_key, ++next_service_id_);
service_cache_[service_key] = service;
pthread_rwlock_unlock(&rwlock_);
return service;
}

Service* InMemoryRegistry::GetServiceInLock(const ServiceKey& service_key) {
Service* service = NULL;
pthread_rwlock_wrlock(&rwlock_);
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
if (service_it != service_cache_.end()) {
service = service_it->second;
}
pthread_rwlock_unlock(&rwlock_);
return service;
}

void InMemoryRegistry::DeleteServiceInLock(const ServiceKey& service_key) {
pthread_rwlock_wrlock(&rwlock_);
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
if (service_it != service_cache_.end()) {
delete service_it->second;
service_cache_.erase(service_it);
}
pthread_rwlock_unlock(&rwlock_);
}

void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time,
RcuMap<ServiceKey, ServiceData>& rcu_cache,
ServiceDataType service_data_type) {
Expand All @@ -166,16 +184,16 @@ void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time,
if (service_data_notify_map_.erase(service_key_with_type) > 0) { // 有通知对象表示注册过handler
context_->GetServerConnector()->DeregisterEventHandler(expired_services[i],
service_data_type);
} else { // 没有通知对象,表示未注册过handler,从磁盘加载后从未访问过的数据,直接删除数据
rcu_cache.Delete(expired_services[i]);
context_impl->GetServiceRecord()->ServiceDataDelete(expired_services[i], service_data_type);
context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(expired_services[i],
service_data_type, "");
}
pthread_rwlock_unlock(&notify_rwlock_);
if (service_data_type == kServiceDataInstances) { // 清除实例数据时对应的服务级别插件也删除
context_impl->DeleteServiceContext(expired_services[i]);
DeleteServiceInLock(expired_services[i]);
}
rcu_cache.Delete(expired_services[i]);
context_impl->GetServiceRecord()->ServiceDataDelete(expired_services[i], service_data_type);
context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(expired_services[i],
service_data_type, "");
pthread_rwlock_unlock(&notify_rwlock_);
}
}

Expand Down Expand Up @@ -248,6 +266,9 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service
if (interval_it != service_interval_map_.end()) {
refresh_interval = interval_it->second;
}
if (data_type == kServiceDataInstances) {
CreateServiceInLock(service_key);
}
// 先加载磁盘缓存数据
CachePersist& cache_persist = context_->GetContextImpl()->GetCacheManager()->GetCachePersist();
ServiceData* disk_service_data = cache_persist.LoadServiceData(service_key, data_type);
Expand All @@ -268,25 +289,21 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service
return kReturnOk;
}

void InMemoryRegistry::DeleteServiceInLock(const ServiceKey& service_key) {
pthread_rwlock_wrlock(&rwlock_);
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
if (service_it != service_cache_.end()) {
delete service_it->second;
service_cache_.erase(service_it);
}
pthread_rwlock_unlock(&rwlock_);
}

ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key,
ServiceDataType data_type,
ServiceData* service_data) {
if (service_data != NULL) { // 更新服务数据指向服务
Service* service = GetOrCreateServiceInLock(service_key);
Service* service = GetServiceInLock(service_key);
if (service != NULL) { // 更新服务数据指向服务
service->UpdateData(service_data);
}
ContextImpl* context_impl = context_->GetContextImpl();
if (data_type == kServiceDataInstances) {
if (service == NULL) { // 服务被反注册了
if (service_data != NULL) {
service_data->DecrementRef();
}
return kReturnOk;
}
ServiceData* old_service_data = service_instances_data_.Get(service_key);
if (old_service_data != NULL) {
PluginManager::Instance().OnPreUpdateServiceData(old_service_data, service_data);
Expand All @@ -307,13 +324,6 @@ ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key,
POLARIS_ASSERT(false);
}
if (service_data == NULL) { // Server Connector反注册Handler触发更新为NULL
if (data_type == kServiceDataInstances) { // 删除服务实例数据时,同时删除服务
DeleteServiceInLock(service_key);
}
context_impl->GetServiceRecord()->ServiceDataDelete(service_key,
data_type); // 同步记录Service数据删除
context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(
service_key, data_type, ""); // 异步删除磁盘服务数据
return kReturnOk;
}
context_impl->GetServiceRecord()->ServiceDataUpdate(service_data); // 同步记录Service版本变化
Expand Down Expand Up @@ -389,4 +399,45 @@ ReturnCode InMemoryRegistry::UpdateSetCircuitBreakerData(
return service->WriteCircuitBreakerUnhealthySets(unhealthy_sets);
}

ReturnCode InMemoryRegistry::GetCircuitBreakerInstances(const ServiceKey& service_key,
ServiceData*& service_data,
std::vector<Instance*>& open_instances) {
service_data = service_instances_data_.Get(service_key, false);
if (service_data == NULL) {
return kReturnServiceNotFound;
}
if (service_data->GetDataStatus() < kDataIsSyncing) {
service_data->DecrementRef();
return kReturnServiceNotFound;
}
// 由于此处获取service data没有更新访问时间,服务可能淘汰,不能直接使用其关联的服务数据
pthread_rwlock_rdlock(&rwlock_);
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
if (service_it == service_cache_.end()) {
pthread_rwlock_unlock(&rwlock_);
return kReturnServiceNotFound;
}
std::set<std::string> open_instance = service_it->second->GetCircuitBreakerOpenInstances();
pthread_rwlock_unlock(&rwlock_);

ServiceInstances service_instances(service_data);
std::map<std::string, Instance*>& instance_map = service_instances.GetInstances();
for (std::set<std::string>::iterator it = open_instance.begin(); it != open_instance.end();
++it) {
const std::string& instance_id = *it;
std::map<std::string, Instance*>::iterator iter = instance_map.find(instance_id);
if (iter == instance_map.end()) {
POLARIS_LOG(LOG_INFO, "The outlier detector of service[%s/%s] getting instance[%s] failed",
service_key.namespace_.c_str(), service_key.name_.c_str(), instance_id.c_str());
continue;
}
open_instances.push_back(iter->second);
}
if (open_instances.empty()) {
return kReturnInstanceNotFound;
}
service_data->IncrementRef();
return kReturnOk;
}

} // namespace polaris
8 changes: 7 additions & 1 deletion polaris/plugin/local_registry/local_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class InMemoryRegistry : public LocalRegistry {
virtual ReturnCode UpdateSetCircuitBreakerData(
const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets);

virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key,
ServiceData*& service_data,
std::vector<Instance*>& open_instances);

virtual ReturnCode UpdateDynamicWeight(const ServiceKey& service_key,
const DynamicWeightData& dynamic_weight_data);

Expand All @@ -105,7 +109,9 @@ class InMemoryRegistry : public LocalRegistry {
ServiceDataNotify* GetOrCreateDataNotify(const ServiceKey& service_key, ServiceDataType data_type,
bool& new_create);

Service* GetOrCreateServiceInLock(const ServiceKey& service_key);
Service* CreateServiceInLock(const ServiceKey& service_key);

Service* GetServiceInLock(const ServiceKey& service_key);

void DeleteServiceInLock(const ServiceKey& service_key);

Expand Down
Loading