Skip to content

Commit

Permalink
make metrics_store thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed May 23, 2024
1 parent 1c17f41 commit fee0b5e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 60 deletions.
140 changes: 81 additions & 59 deletions src/tateyama/metrics/resource/metrics_store_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace tateyama::metrics::resource {

metrics_item_slot& metrics_store_impl::register_item(const metrics_metadata& metadata) {
const std::string key{metadata.key()};
const auto register_item_internal = [&metadata, &key](std::map<std::string, std::unique_ptr<second_map_type>>& metrics) {
const auto register_item_internal = [&metadata, &key](std::map<std::string, std::unique_ptr<second_map_type>>& metrics, std::mutex& mtx) {
std::unique_lock<std::mutex> lock(mtx);
if (metrics.find(key) == metrics.end()) {
metrics.emplace(key, std::make_unique<second_map_type>());
}
Expand All @@ -39,15 +40,17 @@ metrics_item_slot& metrics_store_impl::register_item(const metrics_metadata& met
fmap->emplace(metadata, std::make_pair(tateyama::metrics::metrics_item_slot(item), item));
};
if (metadata.is_visible()) {
register_item_internal(metrics_);
register_item_internal(metrics_, mtx_metrics_);
return (metrics_.find(key)->second->find(metadata)->second).first;
}
register_item_internal(invisible_metrics_);
register_item_internal(invisible_metrics_, mtx_invisible_metrics_);
return (invisible_metrics_.find(key)->second->find(metadata)->second).first;
}

void metrics_store_impl::register_aggregation(const metrics_aggregation& aggregation) {
const std::string key{aggregation.group_key()};
std::unique_lock<std::mutex> lock(mtx_aggregations_);

if (aggregations_.find(key) != aggregations_.end()) {
throw std::runtime_error("aggregation is already registered");
}
Expand All @@ -56,58 +59,70 @@ void metrics_store_impl::register_aggregation(const metrics_aggregation& aggrega

bool metrics_store_impl::unregister_element(std::string_view key) {
const std::string key_string{key};
const auto unregister_element_internal = [&key_string](std::map<std::string, std::unique_ptr<second_map_type>>& metrics) {
const auto unregister_element_internal = [&key_string](std::map<std::string, std::unique_ptr<second_map_type>>& metrics, std::mutex& mtx) {
std::unique_lock<std::mutex> lock(mtx);
if (auto const itr = metrics.find(key_string); itr != metrics.end()) {
metrics.erase(itr);
return true;
}
return false;
};
if (unregister_element_internal(metrics_)) {
if (unregister_element_internal(metrics_, mtx_metrics_)) {
return true;
}
if (unregister_element_internal(invisible_metrics_)) {
if (unregister_element_internal(invisible_metrics_, mtx_invisible_metrics_)) {
return true;
}
if (auto const itr = aggregations_.find(key_string); itr != aggregations_.end()) {
aggregations_.erase(itr);
return true;
{
std::unique_lock<std::mutex> lock(mtx_aggregations_);
if (auto const itr = aggregations_.find(key_string); itr != aggregations_.end()) {
aggregations_.erase(itr);
return true;
}
}
return false;
}

void metrics_store_impl::enumerate_items(std::function<void(metrics_metadata const&, double)> const& acceptor) {
const auto enumerate_items_internal = [&acceptor](std::map<std::string, std::unique_ptr<second_map_type>>& metrics) {
const auto enumerate_items_internal = [&acceptor](std::map<std::string, std::unique_ptr<second_map_type>>& metrics, std::mutex& mtx) {
std::unique_lock<std::mutex> lock(mtx);
for (auto&& fmap: metrics) {
for (auto&& e: *fmap.second) {
acceptor(e.first, (e.second).second->value());
}
}
};
enumerate_items_internal(metrics_);
enumerate_items_internal(invisible_metrics_);
enumerate_items_internal(metrics_, mtx_metrics_);
enumerate_items_internal(invisible_metrics_, mtx_invisible_metrics_);
}

void metrics_store_impl::enumerate_aggregations(std::function<void(metrics_aggregation const&)> const& acceptor) const {
std::unique_lock<std::mutex> lock(mtx_aggregations_);
for (auto&& e: aggregations_) {
acceptor(e.second);
}
}

void metrics_store_impl::set_item_description(::tateyama::proto::metrics::response::MetricsInformation& information) {
for (auto&& ef: metrics_) {
auto* item = information.add_items();
item->set_key(ef.first);
auto&& map = ef.second;
for (auto&& es: *map) {
item->set_description(std::string(es.first.description()));
break;
{
std::unique_lock<std::mutex> lock(mtx_metrics_);
for (auto&& ef: metrics_) {
auto* item = information.add_items();
item->set_key(ef.first);
auto&& map = ef.second;
for (auto&& es: *map) {
item->set_description(std::string(es.first.description()));
break;
}
}
}
for (auto&& a: aggregations_) {
auto* item = information.add_items();
item->set_key(a.first);
item->set_description(std::string(a.second.description()));
{
std::unique_lock<std::mutex> lock(mtx_aggregations_);
for (auto&& a: aggregations_) {
auto* item = information.add_items();
item->set_key(a.first);
item->set_description(std::string(a.second.description()));
}
}
}

Expand All @@ -134,51 +149,58 @@ void metrics_store_impl::set_item_value(::tateyama::proto::metrics::response::Me
::tateyama::proto::metrics::response::MetricsValue* value_;
};

for (auto&& ef: metrics_) {
auto* item = information.add_items();
item->set_key(ef.first);
auto&& map = ef.second;
for (auto&& es: *map) {
auto&& md = es.first;
item->set_description(std::string(md.description()));
auto* mv = item->mutable_value();
auto& mattrs = md.attributes();
if (mattrs.empty()) {
mv->set_value((es.second).second->value());
} else {
auto* array = mv->mutable_array();
auto* elements = array->mutable_elements();
auto* element = elements->Add();
element->set_value((es.second).second->value());
auto* attrmap = element->mutable_attributes();
for (auto&& e: mattrs) {
(*attrmap)[std::get<0>(e)] = std::get<1>(e);
{
std::unique_lock<std::mutex> lock(mtx_metrics_);
for (auto&& ef: metrics_) {
auto* item = information.add_items();
item->set_key(ef.first);
auto&& map = ef.second;
for (auto&& es: *map) {
auto&& md = es.first;
item->set_description(std::string(md.description()));
auto* mv = item->mutable_value();
auto& mattrs = md.attributes();
if (mattrs.empty()) {
mv->set_value((es.second).second->value());
} else {
auto* array = mv->mutable_array();
auto* elements = array->mutable_elements();
auto* element = elements->Add();
element->set_value((es.second).second->value());
auto* attrmap = element->mutable_attributes();
for (auto&& e: mattrs) {
(*attrmap)[std::get<0>(e)] = std::get<1>(e);
}
}
}
}
}

for (auto&& a: aggregations_) {
auto* item = information.add_items();
item->set_key(a.first);
item->set_description(std::string(a.second.description()));
{
std::unique_lock<std::mutex> lock(mtx_aggregations_);
for (auto&& a: aggregations_) {
auto* item = information.add_items();
item->set_key(a.first);
item->set_description(std::string(a.second.description()));

auto aggregator = a.second.create_aggregator();
const auto aggregate_internal = [&aggregator, &a, item](std::map<std::string, std::unique_ptr<second_map_type>>& metrics) {
for (auto&& fmap: metrics) {
for (auto&& e: *fmap.second) {
for (auto&& gk: e.first.group_keys()) {
if (a.first == gk) {
aggregator->add(e.first, (e.second).second->value());
break;
auto aggregator = a.second.create_aggregator();
const auto aggregate_internal = [&aggregator, &a, item](std::map<std::string, std::unique_ptr<second_map_type>>& metrics, std::mutex& mtx) {
std::unique_lock<std::mutex> lock(mtx);
for (auto&& fmap: metrics) {
for (auto&& e: *fmap.second) {
for (auto&& gk: e.first.group_keys()) {
if (a.first == gk) {
aggregator->add(e.first, (e.second).second->value());
break;
}
}
}
}
}
std::visit(encoder(item->mutable_value()), aggregator->aggregate());
};
aggregate_internal(metrics_);
aggregate_internal(invisible_metrics_);
std::visit(encoder(item->mutable_value()), aggregator->aggregate());
};
aggregate_internal(metrics_, mtx_metrics_);
aggregate_internal(invisible_metrics_, mtx_invisible_metrics_);
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/tateyama/metrics/resource/metrics_store_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <map>
#include <vector>
#include <memory>
#include <mutex>

#include <tateyama/metrics/metrics_store.h>
#include <tateyama/metrics/resource/metrics_item_slot_impl.h>
Expand Down Expand Up @@ -65,9 +66,11 @@ class metrics_store_impl {
std::map<std::string, std::unique_ptr<second_map_type>> metrics_{};
std::map<std::string, std::unique_ptr<second_map_type>> invisible_metrics_{};
std::map<std::string, metrics_aggregation> aggregations_{};
mutable std::mutex mtx_metrics_{};
mutable std::mutex mtx_invisible_metrics_{};
mutable std::mutex mtx_aggregations_{};

void set_item_description(::tateyama::proto::metrics::response::MetricsInformation& information);

void set_item_value(::tateyama::proto::metrics::response::MetricsInformation& information);

friend class core;
Expand Down

0 comments on commit fee0b5e

Please sign in to comment.