Skip to content

Commit

Permalink
Optimize discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Sep 8, 2023
1 parent 1078f62 commit bffcecc
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 24 deletions.
4 changes: 2 additions & 2 deletions include/atframe/etcdcli/etcd_discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ class etcd_discovery_set {
std::vector<etcd_discovery_node::ptr_t> round_robin_cache;
size_t round_robin_index;

std::unordered_set<const etcd_discovery_node*> reference_cache;
std::unordered_set<const etcd_discovery_node *> reference_cache;
} index_cache_type;

void rebuild_cache(index_cache_type &cache_set, const metadata_type *rule) const;
void clear_cache(const metadata_type *metadata, const etcd_discovery_node* node_ptr) const;
void clear_cache(const metadata_type *metadata, gsl::span<const etcd_discovery_node *> node_ptrs) const;
static void clear_cache(index_cache_type &cache_set);
index_cache_type *get_index_cache(const metadata_type *metadata) const;
index_cache_type *mutable_index_cache(const metadata_type *metadata) const;
Expand Down
73 changes: 57 additions & 16 deletions src/atframe/etcdcli/etcd_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,9 @@ LIBATAPP_MACRO_API void etcd_discovery_set::add_node(const etcd_discovery_node::
std::string old_name;
uint64_t old_id = 0;

const etcd_discovery_node *clear_cache_node_ptrs[3] = {node.get(), nullptr, nullptr};
size_t clear_cache_node_ptrs_size = 1;

// Insert into id index if id != 0
if (0 != node->get_discovery_info().id()) {
node_by_id_type::iterator iter_id = node_by_id_.find(node->get_discovery_info().id());
Expand All @@ -660,6 +663,11 @@ LIBATAPP_MACRO_API void etcd_discovery_set::add_node(const etcd_discovery_node::
}

// Remove old first, because directly change value of shared_ptr is not thread-safe
if (clear_cache_node_ptrs[clear_cache_node_ptrs_size - 1] != iter_id->second.get()) {
clear_cache_node_ptrs[clear_cache_node_ptrs_size - 1] = iter_id->second.get();
++clear_cache_node_ptrs_size;
}

iter_id->second.reset();
iter_id->second = node;
has_insert = true;
Expand All @@ -679,6 +687,11 @@ LIBATAPP_MACRO_API void etcd_discovery_set::add_node(const etcd_discovery_node::
}

// Remove old first, because directly change value of shared_ptr is not thread-safe
if (clear_cache_node_ptrs[clear_cache_node_ptrs_size - 1] != iter_name->second.get()) {
clear_cache_node_ptrs[clear_cache_node_ptrs_size - 1] = iter_name->second.get();
++clear_cache_node_ptrs_size;
}

iter_name->second.reset();
iter_name->second = node;
has_insert = true;
Expand All @@ -699,12 +712,12 @@ LIBATAPP_MACRO_API void etcd_discovery_set::add_node(const etcd_discovery_node::
node_by_name_.erase(iter_name);
}
}
}

if (node->get_discovery_info().has_metadata()) {
clear_cache(&node->get_discovery_info().metadata(), node.get());
} else {
clear_cache(nullptr, node.get());
}
if (node->get_discovery_info().has_metadata()) {
clear_cache(&node->get_discovery_info().metadata(), clear_cache_node_ptrs);
} else {
clear_cache(nullptr, clear_cache_node_ptrs);
}
}

Expand All @@ -714,6 +727,8 @@ LIBATAPP_MACRO_API void etcd_discovery_set::remove_node(const etcd_discovery_nod
}

bool has_cleanup = false;
const etcd_discovery_node *clear_cache_node_ptrs[] = {node.get()};

if (!node->get_discovery_info().name().empty()) {
node_by_name_type::iterator iter_name = node_by_name_.find(node->get_discovery_info().name());
if (iter_name != node_by_name_.end() && iter_name->second == node) {
Expand All @@ -732,9 +747,9 @@ LIBATAPP_MACRO_API void etcd_discovery_set::remove_node(const etcd_discovery_nod

if (has_cleanup) {
if (node->get_discovery_info().has_metadata()) {
clear_cache(&node->get_discovery_info().metadata(), node.get());
clear_cache(&node->get_discovery_info().metadata(), clear_cache_node_ptrs);
} else {
clear_cache(nullptr, node.get());
clear_cache(nullptr, clear_cache_node_ptrs);
}
}
}
Expand All @@ -745,6 +760,8 @@ LIBATAPP_MACRO_API void etcd_discovery_set::remove_node(uint64_t id) {
return;
}

const etcd_discovery_node *clear_cache_node_ptrs[] = {iter_id->second.get()};

if (iter_id->second && !iter_id->second->get_discovery_info().name().empty()) {
node_by_name_type::iterator iter_name = node_by_name_.find(iter_id->second->get_discovery_info().name());
if (iter_name != node_by_name_.end() && iter_name->second == iter_id->second) {
Expand All @@ -753,9 +770,9 @@ LIBATAPP_MACRO_API void etcd_discovery_set::remove_node(uint64_t id) {
}

if (iter_id->second->get_discovery_info().has_metadata()) {
clear_cache(&iter_id->second->get_discovery_info().metadata(), iter_id->second.get());
clear_cache(&iter_id->second->get_discovery_info().metadata(), clear_cache_node_ptrs);
} else {
clear_cache(nullptr, iter_id->second.get());
clear_cache(nullptr, clear_cache_node_ptrs);
}

node_by_id_.erase(iter_id);
Expand All @@ -767,6 +784,8 @@ LIBATAPP_MACRO_API void etcd_discovery_set::remove_node(const std::string &name)
return;
}

const etcd_discovery_node *clear_cache_node_ptrs[] = {iter_name->second.get()};

if (iter_name->second && 0 != iter_name->second->get_discovery_info().id()) {
node_by_id_type::iterator iter_id = node_by_id_.find(iter_name->second->get_discovery_info().id());
if (iter_id != node_by_id_.end() && iter_name->second == iter_id->second) {
Expand All @@ -775,9 +794,9 @@ LIBATAPP_MACRO_API void etcd_discovery_set::remove_node(const std::string &name)
}

if (iter_name->second->get_discovery_info().has_metadata()) {
clear_cache(&iter_name->second->get_discovery_info().metadata(), iter_name->second.get());
clear_cache(&iter_name->second->get_discovery_info().metadata(), clear_cache_node_ptrs);
} else {
clear_cache(nullptr, iter_name->second.get());
clear_cache(nullptr, clear_cache_node_ptrs);
}

node_by_name_.erase(iter_name);
Expand Down Expand Up @@ -846,17 +865,39 @@ void etcd_discovery_set::rebuild_cache(index_cache_type &cache_set, const metada
std::sort(cache_set.hashing_cache.begin(), cache_set.hashing_cache.end(), consistent_hash_compare_index);
}

void etcd_discovery_set::clear_cache(const metadata_type *metadata, const etcd_discovery_node *node_ptr) const {
if (nullptr == metadata && nullptr == node_ptr) {
return;
void etcd_discovery_set::clear_cache(const metadata_type *metadata,
gsl::span<const etcd_discovery_node *> node_ptrs) const {
{
bool has_node_ptr = false;
for (auto &node_ptr : node_ptrs) {
if (nullptr != node_ptr) {
has_node_ptr = true;
break;
}
}
if (nullptr == metadata && !has_node_ptr) {
return;
}
}

clear_cache(default_index_);

std::vector<const metadata_type *> pending_to_delete;
for (auto &metadata_index : metadata_index_) {
if (metadata_index.second.reference_cache.end() != metadata_index.second.reference_cache.find(node_ptr) ||
(nullptr != metadata && metadata_equal_type::filter(metadata_index.first, *metadata))) {
bool matched = false;
for (auto &node_ptr : node_ptrs) {
if (nullptr == node_ptr) {
continue;
}

if (metadata_index.second.reference_cache.end() != metadata_index.second.reference_cache.find(node_ptr)) {
matched = true;
pending_to_delete.push_back(&metadata_index.first);
break;
}
}

if (!matched && (nullptr != metadata && metadata_equal_type::filter(metadata_index.first, *metadata))) {
pending_to_delete.push_back(&metadata_index.first);
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/atframe/modules/etcd_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1601,12 +1601,12 @@ bool etcd_module::update_inner_watcher_event(node_info_t &node, const etcd_disco
}

if (local_cache_by_id) {
global_discovery_.remove_node(local_cache_by_id);
new_inst = local_cache_by_id;
} else {
new_inst = std::make_shared<etcd_discovery_node>();
}

new_inst = std::make_shared<etcd_discovery_node>();
new_inst->copy_from(node.node_discovery, version);

global_discovery_.add_node(new_inst);

has_event = true;
Expand Down Expand Up @@ -1635,13 +1635,19 @@ bool etcd_module::update_inner_watcher_event(node_info_t &node, const etcd_disco
}

if (has_event) {
new_inst = std::make_shared<etcd_discovery_node>();
if (local_cache_by_id) {
new_inst = local_cache_by_id;
} else if (local_cache_by_name) {
new_inst = local_cache_by_name;
} else {
new_inst = std::make_shared<etcd_discovery_node>();
}
new_inst->copy_from(node.node_discovery, version);

if (local_cache_by_id) {
if (local_cache_by_id && local_cache_by_id != new_inst) {
global_discovery_.remove_node(local_cache_by_id);
}
if (local_cache_by_name) {
if (local_cache_by_name && local_cache_by_name != new_inst) {
global_discovery_.remove_node(local_cache_by_name);
}

Expand Down

0 comments on commit bffcecc

Please sign in to comment.