Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions metrics-exporter-prometheus/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,42 @@ pub enum BuildError {
ZeroBucketDuration,
}

/// Represents a set of labels as structured key-value pairs
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LabelSet {
pub labels: Vec<(String, String)>,
}

impl LabelSet {
pub fn from_key_and_global(
key: &metrics::Key,
global_labels: &IndexMap<String, String>,
) -> Self {
let mut labels = global_labels.clone();
key.labels().for_each(|label| {
labels.insert(label.key().to_string(), label.value().to_string());
});
Self { labels: labels.into_iter().collect() }
}

pub fn is_empty(&self) -> bool {
self.labels.is_empty()
}

pub fn to_strings(&self) -> impl Iterator<Item = String> + '_ {
self.labels.iter().map(|(k, v)| {
format!(
"{}=\"{}\"",
crate::formatting::sanitize_label_key(k),
crate::formatting::sanitize_label_value(v)
)
})
}
}

#[derive(Debug)]
pub struct Snapshot {
pub counters: HashMap<String, HashMap<Vec<String>, u64>>,
pub gauges: HashMap<String, HashMap<Vec<String>, f64>>,
pub distributions: HashMap<String, IndexMap<Vec<String>, Distribution>>,
pub counters: HashMap<String, HashMap<LabelSet, u64>>,
pub gauges: HashMap<String, HashMap<LabelSet, f64>>,
pub distributions: HashMap<String, IndexMap<LabelSet, Distribution>>,
}
32 changes: 5 additions & 27 deletions metrics-exporter-prometheus/src/formatting.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,8 @@
//! Helpers for rendering metrics in the Prometheus exposition format.

use indexmap::IndexMap;
use metrics::{Key, Unit};
use metrics::Unit;

/// Breaks a key into the name and label components, with optional default labels.
///
/// If any of the default labels are not already present, they will be added to the overall list of labels.
///
/// Both the metric name, and labels, are sanitized. See [`sanitize_metric_name`], [`sanitize_label_key`],
/// and [`sanitize_label_value`] for more information.
pub fn key_to_parts(
key: &Key,
default_labels: Option<&IndexMap<String, String>>,
) -> (String, Vec<String>) {
let name = sanitize_metric_name(key.name());
let mut values = default_labels.cloned().unwrap_or_default();
key.labels().for_each(|label| {
values.insert(label.key().to_string(), label.value().to_string());
});
let labels = values
.iter()
.map(|(k, v)| format!("{}=\"{}\"", sanitize_label_key(k), sanitize_label_value(v)))
.collect();

(name, labels)
}
use crate::common::LabelSet;

/// Writes a help (description) line in the Prometheus [exposition format].
///
Expand Down Expand Up @@ -73,7 +51,7 @@ pub fn write_metric_line<T, T2>(
buffer: &mut String,
name: &str,
suffix: Option<&'static str>,
labels: &[String],
labels: &LabelSet,
additional_label: Option<(&'static str, T)>,
value: T2,
unit: Option<Unit>,
Expand All @@ -87,13 +65,13 @@ pub fn write_metric_line<T, T2>(
buffer.push('{');

let mut first = true;
for label in labels {
for label in labels.to_strings() {
if first {
first = false;
} else {
buffer.push(',');
}
buffer.push_str(label);
buffer.push_str(&label);
}

if let Some((name, value)) = additional_label {
Expand Down
83 changes: 35 additions & 48 deletions metrics-exporter-prometheus/src/protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Protobuf serialization support for Prometheus metrics.

use indexmap::IndexMap;
use metrics::Unit;
use prost::Message;
use std::collections::HashMap;

use crate::common::Snapshot;
use crate::common::{LabelSet, Snapshot};
use crate::distribution::Distribution;
use crate::formatting::sanitize_metric_name;

Expand All @@ -26,28 +25,27 @@ pub(crate) const PROTOBUF_CONTENT_TYPE: &str =
/// length header.
#[allow(clippy::too_many_lines)]
pub(crate) fn render_protobuf(
snapshot: &Snapshot,
snapshot: Snapshot,
descriptions: &HashMap<String, (metrics::SharedString, Option<Unit>)>,
global_labels: &IndexMap<String, String>,
counter_suffix: Option<&'static str>,
) -> Vec<u8> {
let mut output = Vec::new();

// Process counters
for (name, by_labels) in &snapshot.counters {
let sanitized_name = sanitize_metric_name(name);
for (name, by_labels) in snapshot.counters {
let sanitized_name = sanitize_metric_name(&name);
let help =
descriptions.get(name.as_str()).map(|(desc, _)| desc.to_string()).unwrap_or_default();

let mut metrics = Vec::new();
for (labels, value) in by_labels {
let label_pairs = parse_labels(labels, global_labels);
let label_pairs = label_set_to_protobuf(labels);

metrics.push(pb::Metric {
label: label_pairs,
counter: Some(pb::Counter {
#[allow(clippy::cast_precision_loss)]
value: Some(*value as f64),
value: Some(value as f64),

..Default::default()
}),
Expand All @@ -68,18 +66,18 @@ pub(crate) fn render_protobuf(
}

// Process gauges
for (name, by_labels) in &snapshot.gauges {
let sanitized_name = sanitize_metric_name(name);
for (name, by_labels) in snapshot.gauges {
let sanitized_name = sanitize_metric_name(&name);
let help =
descriptions.get(name.as_str()).map(|(desc, _)| desc.to_string()).unwrap_or_default();

let mut metrics = Vec::new();
for (labels, value) in by_labels {
let label_pairs = parse_labels(labels, global_labels);
let label_pairs = label_set_to_protobuf(labels);

metrics.push(pb::Metric {
label: label_pairs,
gauge: Some(pb::Gauge { value: Some(*value) }),
gauge: Some(pb::Gauge { value: Some(value) }),

..Default::default()
});
Expand All @@ -97,18 +95,20 @@ pub(crate) fn render_protobuf(
}

// Process distributions (histograms and summaries)
for (name, by_labels) in &snapshot.distributions {
let sanitized_name = sanitize_metric_name(name);
for (name, by_labels) in snapshot.distributions {
let sanitized_name = sanitize_metric_name(&name);
let help =
descriptions.get(name.as_str()).map(|(desc, _)| desc.to_string()).unwrap_or_default();

let mut metrics = Vec::new();
let mut metric_type = None;
for (labels, distribution) in by_labels {
let label_pairs = parse_labels(labels, global_labels);
let label_pairs = label_set_to_protobuf(labels);

let metric = match distribution {
Distribution::Summary(summary, quantiles, sum) => {
use quanta::Instant;
metric_type = Some(pb::MetricType::Summary);
let snapshot = summary.snapshot(Instant::now());
let quantile_values: Vec<pb::Quantile> = quantiles
.iter()
Expand All @@ -122,7 +122,7 @@ pub(crate) fn render_protobuf(
label: label_pairs,
summary: Some(pb::Summary {
sample_count: Some(summary.count() as u64),
sample_sum: Some(*sum),
sample_sum: Some(sum),
quantile: quantile_values,

created_timestamp: None,
Expand All @@ -132,6 +132,7 @@ pub(crate) fn render_protobuf(
}
}
Distribution::Histogram(histogram) => {
metric_type = Some(pb::MetricType::Histogram);
let mut buckets = Vec::new();
for (le, count) in histogram.buckets() {
buckets.push(pb::Bucket {
Expand Down Expand Up @@ -167,10 +168,9 @@ pub(crate) fn render_protobuf(
metrics.push(metric);
}

let metric_type = match by_labels.values().next() {
Some(Distribution::Summary(_, _, _)) => pb::MetricType::Summary,
Some(Distribution::Histogram(_)) => pb::MetricType::Histogram,
None => continue, // Skip empty metric families
let Some(metric_type) = metric_type else {
// Skip empty metric families
continue;
};

let metric_family = pb::MetricFamily {
Expand All @@ -187,29 +187,11 @@ pub(crate) fn render_protobuf(
output
}

fn parse_labels(labels: &[String], global_labels: &IndexMap<String, String>) -> Vec<pb::LabelPair> {
fn label_set_to_protobuf(labels: LabelSet) -> Vec<pb::LabelPair> {
let mut label_pairs = Vec::new();

// Add global labels first
for (key, value) in global_labels {
label_pairs.push(pb::LabelPair { name: Some(key.clone()), value: Some(value.clone()) });
}

// Add metric-specific labels
for label_str in labels {
if let Some(eq_pos) = label_str.find('=') {
let key = &label_str[..eq_pos];
let value = &label_str[eq_pos + 1..];
let value = value.trim_matches('"');

// Skip if this label key already exists from global labels
if !global_labels.contains_key(key) {
label_pairs.push(pb::LabelPair {
name: Some(key.to_string()),
value: Some(value.to_string()),
});
}
}
for (key, value) in labels.labels {
label_pairs.push(pb::LabelPair { name: Some(key), value: Some(value) });
}

label_pairs
Expand All @@ -235,16 +217,18 @@ mod tests {
fn test_render_protobuf_counters() {
let mut counters = HashMap::new();
let mut counter_labels = HashMap::new();
counter_labels.insert(vec!["method=\"GET\"".to_string()], 42u64);
let labels = LabelSet::from_key_and_global(
&metrics::Key::from_parts("", vec![metrics::Label::new("method", "GET")]),
&IndexMap::new(),
);
counter_labels.insert(labels, 42u64);
counters.insert("http_requests".to_string(), counter_labels);

let snapshot = Snapshot { counters, gauges: HashMap::new(), distributions: HashMap::new() };

let descriptions = HashMap::new();
let global_labels = IndexMap::new();

let protobuf_data =
render_protobuf(&snapshot, &descriptions, &global_labels, Some("total"));
let protobuf_data = render_protobuf(snapshot, &descriptions, Some("total"));

assert!(!protobuf_data.is_empty(), "Protobuf data should not be empty");

Expand All @@ -264,7 +248,11 @@ mod tests {
fn test_render_protobuf_gauges() {
let mut gauges = HashMap::new();
let mut gauge_labels = HashMap::new();
gauge_labels.insert(vec!["instance=\"localhost\"".to_string()], 0.75f64);
let labels = LabelSet::from_key_and_global(
&metrics::Key::from_parts("", vec![metrics::Label::new("instance", "localhost")]),
&IndexMap::new(),
);
gauge_labels.insert(labels, 0.75f64);
gauges.insert("cpu_usage".to_string(), gauge_labels);

let snapshot = Snapshot { counters: HashMap::new(), gauges, distributions: HashMap::new() };
Expand All @@ -274,9 +262,8 @@ mod tests {
"cpu_usage".to_string(),
(SharedString::const_str("CPU usage percentage"), None),
);
let global_labels = IndexMap::new();

let protobuf_data = render_protobuf(&snapshot, &descriptions, &global_labels, None);
let protobuf_data = render_protobuf(snapshot, &descriptions, None);

assert!(!protobuf_data.is_empty(), "Protobuf data should not be empty");

Expand Down
25 changes: 12 additions & 13 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, Share
use metrics_util::registry::{Recency, Registry};
use quanta::Instant;

use crate::common::Snapshot;
use crate::common::{LabelSet, Snapshot};
use crate::distribution::{Distribution, DistributionBuilder};
use crate::formatting::{
key_to_parts, sanitize_metric_name, write_help_line, write_metric_line, write_type_line,
sanitize_metric_name, write_help_line, write_metric_line, write_type_line,
};
use crate::registry::GenerationalAtomicStorage;

#[derive(Debug)]
pub(crate) struct Inner {
pub registry: Registry<Key, GenerationalAtomicStorage>,
pub recency: Recency<Key>,
pub distributions: RwLock<HashMap<String, IndexMap<Vec<String>, Distribution>>>,
pub distributions: RwLock<HashMap<String, IndexMap<LabelSet, Distribution>>>,
pub distribution_builder: DistributionBuilder,
pub descriptions: RwLock<HashMap<String, (SharedString, Option<Unit>)>>,
pub global_labels: IndexMap<String, String>,
Expand All @@ -37,7 +37,8 @@ impl Inner {
continue;
}

let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let name = sanitize_metric_name(key.name());
let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
let value = counter.get_inner().load(Ordering::Acquire);
let entry =
counters.entry(name).or_insert_with(HashMap::new).entry(labels).or_insert(0);
Expand All @@ -52,7 +53,8 @@ impl Inner {
continue;
}

let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let name = sanitize_metric_name(key.name());
let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
let entry =
gauges.entry(name).or_insert_with(HashMap::new).entry(labels).or_insert(0.0);
Expand All @@ -69,7 +71,8 @@ impl Inner {
// Since we store aggregated distributions directly, when we're told that a metric
// is not recent enough and should be/was deleted from the registry, we also need to
// delete it on our side as well.
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let name = sanitize_metric_name(key.name());
let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
by_name.swap_remove(&labels);
Expand Down Expand Up @@ -98,7 +101,8 @@ impl Inner {
fn drain_histograms_to_distributions(&self) {
let histogram_handles = self.registry.get_histogram_handles();
for (key, histogram) in histogram_handles {
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let name = sanitize_metric_name(key.name());
let labels = LabelSet::from_key_and_global(&key, &self.global_labels);

let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let entry = wg
Expand Down Expand Up @@ -332,12 +336,7 @@ impl PrometheusHandle {
let snapshot = self.inner.get_recent_metrics();
let descriptions = self.inner.descriptions.read().unwrap_or_else(PoisonError::into_inner);

crate::protobuf::render_protobuf(
&snapshot,
&descriptions,
&self.inner.global_labels,
self.inner.counter_suffix,
)
crate::protobuf::render_protobuf(snapshot, &descriptions, self.inner.counter_suffix)
}

/// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not
Expand Down