Skip to content

Commit

Permalink
metrics: Allow aggregating counter
Browse files Browse the repository at this point in the history
This is super-hacky at the moment.

Signed-off-by: BenPope <benpope81@gmail.com>
  • Loading branch information
BenPope authored and Vlad Lazar committed Jun 20, 2022
1 parent d96b73f commit 57f55f8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 40 deletions.
16 changes: 8 additions & 8 deletions include/seastar/core/metrics.hh
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ extern label shard_label;
*/
template<typename T>
impl::metric_definition_impl make_gauge(metric_name_type name,
T&& val, description d=description(), std::vector<label_instance> labels = {}) {
return {name, {impl::data_type::GAUGE, "gauge"}, make_function(std::forward<T>(val), impl::data_type::GAUGE), d, labels};
T&& val, description d=description(), std::vector<label_instance> labels = {}, std::vector<std::string> aggregate_labels = {}) {
return {name, {impl::data_type::GAUGE, "gauge"}, make_function(std::forward<T>(val), impl::data_type::GAUGE), d, std::move(labels), std::move(aggregate_labels)};
}

/*!
Expand Down Expand Up @@ -521,9 +521,9 @@ impl::metric_definition_impl make_derive(metric_name_type name, description d, s
*/
template<typename T>
impl::metric_definition_impl make_counter(metric_name_type name,
T&& val, description d=description(), std::vector<label_instance> labels = {}) {
T&& val, description d=description(), std::vector<label_instance> labels = {}, std::vector<std::string> aggregate_labels = {}) {
auto type = impl::counter_type_traits<std::remove_reference_t<T>>::type;
return {name, {type, "counter"}, make_function(std::forward<T>(val), type), d, labels};
return {name, {type, "counter"}, make_function(std::forward<T>(val), type), d, std::move(labels), std::move(aggregate_labels)};
}

/*!
Expand Down Expand Up @@ -627,8 +627,8 @@ impl::metric_definition_impl make_summary(metric_name_type name,
template<typename T>
impl::metric_definition_impl make_total_bytes(metric_name_type name,
T&& val, description d=description(), std::vector<label_instance> labels = {},
instance_id_type instance = impl::shard()) {
return make_counter(name, std::forward<T>(val), d, labels).set_type("total_bytes");
instance_id_type instance = impl::shard(), std::vector<std::string> aggregate_labels = {}) {
return make_counter(name, std::forward<T>(val), d, std::move(labels), std::move(aggregate_labels)).set_type("total_bytes");
}

/*!
Expand All @@ -641,8 +641,8 @@ impl::metric_definition_impl make_total_bytes(metric_name_type name,
template<typename T>
impl::metric_definition_impl make_current_bytes(metric_name_type name,
T&& val, description d=description(), std::vector<label_instance> labels = {},
instance_id_type instance = impl::shard()) {
return make_gauge(name, std::forward<T>(val), d, labels).set_type("bytes");
instance_id_type instance = impl::shard(), std::vector<std::string> aggregate_labels = {}) {
return make_gauge(name, std::forward<T>(val), d, std::move(labels), std::move(aggregate_labels)).set_type("bytes");
}


Expand Down
92 changes: 60 additions & 32 deletions src/core/prometheus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Copyright (C) 2016 ScyllaDB
*/

#include <seastar/core/metrics.hh>
#include <seastar/core/prometheus.hh>
#include <sstream>

Expand Down Expand Up @@ -481,6 +482,24 @@ metric_family_range get_range(const metrics_families_per_shard& mf, const sstrin

}

void write_counter(std::stringstream& s, const config& ctx, const sstring& name, const mi::metric_value& value, std::map<sstring, sstring> labels) {
add_name(s, name, labels, ctx);
std::string value_str;
try {
value_str = to_str(value);
} catch (const std::range_error& e) {
seastar_logger.debug("prometheus: write_text_representation: {}: {}", s.str(), e.what());
value_str = "NaN";
} catch (...) {
auto ex = std::current_exception();
// print this error as it's ignored later on by `connection::start_response`
seastar_logger.error("prometheus: write_text_representation: {}: {}", s.str(), ex);
std::rethrow_exception(std::move(ex));
}
s << value_str;
s << "\n";
}

void write_histogram(std::stringstream& s, const config& ctx, const sstring& name, const seastar::metrics::histogram& h, std::map<sstring, sstring> labels) {
add_name(s, name + "_sum", labels, ctx);
s << h.sample_sum;
Expand Down Expand Up @@ -545,19 +564,39 @@ void histogram_aggregator::add_histogram(const seastar::metrics::histogram& h, s
_histograms[labels] += h;
}

class counter_aggregator {
std::vector<std::string> _remove_labels;
std::unordered_map<std::map<sstring, sstring>, mi::metric_value> _counters;
public:
explicit counter_aggregator(std::vector<std::string> labels) : _remove_labels(std::move(labels)) {
}
void add_counter(const mi::metric_value& value, std::map<sstring, sstring> labels);
const std::unordered_map<std::map<sstring, sstring>, mi::metric_value>& get_counters() const {
return _counters;
}
bool empty() const {
return _counters.empty();
}
};

void counter_aggregator::add_counter(const mi::metric_value& value, std::map<sstring, sstring> labels) {
for (auto&& l : _remove_labels) {
labels.erase(l);
}
_counters[labels] += value;
}

future<> write_text_representation(output_stream<char>& out, const config& ctx, const metric_family_range& m) {
return seastar::async([&ctx, &out, &m] () mutable {
bool found = false;
for (metric_family& metric_family : m) {
auto name = ctx.prefix + "_" + metric_family.name();
found = false;
histogram_aggregator histograms(metric_family.metadata().aggregate_labels);
counter_aggregator counters(metric_family.metadata().aggregate_labels);
bool should_aggregate = !metric_family.metadata().aggregate_labels.empty();
metric_family.foreach_metric([&out, &ctx, &found, &name, &metric_family, &histograms, should_aggregate](auto value, auto value_info) mutable {
metric_family.foreach_metric([&out, &ctx, &found, &name, &metric_family, &histograms, &counters, should_aggregate](auto value, auto value_info) mutable {
std::stringstream s;
if (value.is_empty()) {
return;
}
if (!found) {
if (metric_family.metadata().d.str() != "") {
s << "# HELP " << name << " " << metric_family.metadata().d.str() << "\n";
Expand All @@ -574,39 +613,28 @@ future<> write_text_representation(output_stream<char>& out, const config& ctx,
write_histogram(s, ctx, name, value.get_histogram(), value_info.id.labels());
}
} else {
add_name(s, name, value_info.id.labels(), ctx);
std::string value_str;
try {
value_str = to_str(value);
} catch (const std::range_error& e) {
seastar_logger.debug("prometheus: write_text_representation: {}: {}", s.str(), e.what());
value_str = "NaN";
} catch (...) {
auto ex = std::current_exception();
// print this error as it's ignored later on by `connection::start_response`
seastar_logger.error("prometheus: write_text_representation: {}: {}", s.str(), ex);
std::rethrow_exception(std::move(ex));
if (should_aggregate) {
counters.add_counter(value, value_info.id.labels());
} else {
write_counter(s, ctx, name, value, value_info.id.labels());
}
s << value_str;
s << "\n";
}
out.write(s.str()).get();
thread::maybe_yield();
});
if (!histograms.empty()) {
auto name = ctx.prefix + "_" + metric_family.name();
std::stringstream name_help;
if (metric_family.metadata().d.str() != "") {
name_help << "# HELP " << name << " " << metric_family.metadata().d.str() << "\n";
}
name_help << "# TYPE " << name << " histogram" << "\n";
out.write(name_help.str()).get();
for (auto&& h : histograms.get_histograms()) {
std::stringstream s;
write_histogram(s, ctx, name, h.second, h.first);
out.write(s.str()).get();
thread::maybe_yield();
}

for (auto&& h : histograms.get_histograms()) {
std::stringstream s;
write_histogram(s, ctx, name, h.second, h.first);
out.write(s.str()).get();
thread::maybe_yield();
}

for (auto&& h : counters.get_counters()) {
std::stringstream s;
write_counter(s, ctx, name, h.second, h.first);
out.write(s.str()).get();
thread::maybe_yield();
}
}
});
Expand Down

0 comments on commit 57f55f8

Please sign in to comment.