Skip to content

Commit

Permalink
Support for using tracing span fields as metrics labels (#87)
Browse files Browse the repository at this point in the history
* Add editorconfig

* Add initial metrics-tracing-context version

* Add test_no_labels

* Add test_multiple_paths_to_the_same_callsite (fails currently)

* Make integration tests runnable in bulk

* Add reverse mappings to registry

* Handle dynamic labels explicitly in the interface and macros

* Provide a default implementation for dynamic metric ops

* Update metrics-util

* Update metrics-tracing-context

* Apply fmt

* Fix handling of labels during registration

* Add tests for macros

* Move identifier into a separate mod

* Correct the test at metrics-tracing-context/tests/integration.rs

* Switch to using Key instead of Identifier for usual calls

* Update utils to use Key instead of Identifier

* fixup! Switch to using Key instead of Identifier for usual calls

* Update metrics-tracing-context to make the tests pass

* Update the exporter crates to use Key instead of Identifier

* Apply a workaround for unstable ordering of the snapshot output

* Update the bench code with the latest design

* Apply workaround for criterion comparison

* Completely switch to KeyRef at the API and re-add callsite caching

* Removed the option wrapper at Key::labels internally

* Removed the option wrapper at Key::labels in the public API

* More performant can_use_fast_path

* Remove the identifier from metrics and added a simpler one at metrics_util::registry

* Rename Key to KeyData and KeyRef to Key

* Fix doc-comments

* Fix macros benches

* Fix registry benches

* Document the metrics-tracing-contextc crate

* Add test case for various macro forms

* Add nested spans test (currently fails)

* Better notation for owned key at registry benches

* Add criterion workaround at metrics-util/Cargo.toml

* Change the Registry for upsert-like API and use DashMap internally

* Update prometheus to the new Registry API

* Update the metrics-exporter-tcp for new API

It doesn't use Registry at all anymore!

* Update registry benches

* Change the test_nested_spans

The iteration, unfortunately, can only happen in the inside-out order of
span nesting.
We will work around this on another level, and here we just collect all
the labels out there.

* Implement the code to make the test_nested_spans pass

* Add label filtering

* More registry benches

* Update metrics/src/key.rs

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>

* Update metrics/src/key.rs

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>

* Update metrics-tracing-context/src/lib.rs

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>

* Update metrics-tracing-context/src/label_filter.rs

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>

* Update metrics-macros/src/lib.rs

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>

* Update metrics-macros/src/lib.rs

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>

* Add proper trailing comma handling and parsing tests

* Better crate doctest

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>
  • Loading branch information
MOZGIII and tobz committed Sep 16, 2020
1 parent c22fdc4 commit 8bbe1b1
Show file tree
Hide file tree
Showing 28 changed files with 1,500 additions and 649 deletions.
13 changes: 13 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# http://editorconfig.org
root = true

[*]
indent_style = space
indent_size = 2
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.rs]
indent_size = 4
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ members = [
"metrics-benchmark",
"metrics-exporter-tcp",
"metrics-exporter-prometheus",
"metrics-tracing-context",
]
98 changes: 51 additions & 47 deletions metrics-exporter-prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hyper::{
service::{make_service_fn, service_fn},
{Body, Error as HyperError, Response, Server},
};
use metrics::{Identifier, Key, Label, Recorder, SetRecorderError};
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
parse_quantiles, CompositeKey, Handle, Histogram, MetricKind, Quantile, Registry,
};
Expand Down Expand Up @@ -494,73 +494,77 @@ impl PrometheusBuilder {
}

impl Recorder for PrometheusRecorder {
fn register_counter(&self, key: Key, description: Option<&'static str>) -> Identifier {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner
.registry()
.get_or_create_identifier(CompositeKey::new(MetricKind::Counter, key), |_| {
Handle::counter()
})
self.inner.registry().op(
CompositeKey::new(MetricKind::Counter, key),
|_| {},
|| Handle::counter(),
);
}

fn register_gauge(&self, key: Key, description: Option<&'static str>) -> Identifier {
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner
.registry()
.get_or_create_identifier(CompositeKey::new(MetricKind::Gauge, key), |_| {
Handle::gauge()
})
self.inner.registry().op(
CompositeKey::new(MetricKind::Gauge, key),
|_| {},
|| Handle::gauge(),
);
}

fn register_histogram(&self, key: Key, description: Option<&'static str>) -> Identifier {
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner
.registry()
.get_or_create_identifier(CompositeKey::new(MetricKind::Histogram, key), |_| {
Handle::histogram()
})
self.inner.registry().op(
CompositeKey::new(MetricKind::Histogram, key),
|_| {},
|| Handle::histogram(),
);
}

fn increment_counter(&self, id: Identifier, value: u64) {
self.inner
.registry()
.with_handle(id, |h| h.increment_counter(value));
fn increment_counter(&self, key: Key, value: u64) {
self.inner.registry().op(
CompositeKey::new(MetricKind::Counter, key),
|h| h.increment_counter(value),
|| Handle::counter(),
);
}

fn update_gauge(&self, id: Identifier, value: f64) {
self.inner
.registry()
.with_handle(id, |h| h.update_gauge(value));
fn update_gauge(&self, key: Key, value: f64) {
self.inner.registry().op(
CompositeKey::new(MetricKind::Gauge, key),
|h| h.update_gauge(value),
|| Handle::gauge(),
);
}

fn record_histogram(&self, id: Identifier, value: u64) {
self.inner
.registry()
.with_handle(id, |h| h.record_histogram(value));
fn record_histogram(&self, key: Key, value: u64) {
self.inner.registry().op(
CompositeKey::new(MetricKind::Histogram, key),
|h| h.record_histogram(value),
|| Handle::histogram(),
);
}
}

fn key_to_parts(key: Key) -> (String, Vec<String>) {
let (name, labels) = key.into_parts();
let name = key.name();
let labels = key.labels();
let sanitize = |c| c == '.' || c == '=' || c == '{' || c == '}' || c == '+' || c == '-';
let name = name.replace(sanitize, "_");
let labels = labels
.map(|labels| {
labels
.into_iter()
.map(Label::into_parts)
.map(|(k, v)| {
format!(
"{}=\"{}\"",
k,
v.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
)
})
.collect()
.into_iter()
.map(|label| {
let k = label.key();
let v = label.value();
format!(
"{}=\"{}\"",
k,
v.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
)
})
.unwrap_or_else(|| Vec::new());
.collect();

(name, labels)
}
Expand Down
136 changes: 41 additions & 95 deletions metrics-exporter-tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ use std::time::SystemTime;

use bytes::Bytes;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use metrics::{Identifier, Key, Recorder, SetRecorderError};
use metrics_util::Registry;
use metrics::{Key, Recorder, SetRecorderError};
use mio::{
net::{TcpListener, TcpStream},
Events, Interest, Poll, Token, Waker,
Expand All @@ -71,30 +70,12 @@ mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}

type TcpRegistry = Registry<CompositeKey, CompositeKey>;

#[derive(Eq, PartialEq, Hash, Clone)]
enum MetricKind {
Counter,
Gauge,
Histogram,
}

enum MetricValue {
Counter(u64),
Gauge(f64),
Histogram(u64),
}

#[derive(Eq, PartialEq, Hash, Clone)]
struct CompositeKey(MetricKind, Key);

impl CompositeKey {
pub fn key(&self) -> &Key {
&self.1
}
}

/// Errors that could occur while installing a TCP recorder/exporter.
#[derive(Debug)]
pub enum Error {
Expand All @@ -119,8 +100,7 @@ impl From<SetRecorderError> for Error {

/// A TCP recorder.
pub struct TcpRecorder {
registry: Arc<TcpRegistry>,
tx: Sender<(Identifier, MetricValue)>,
tx: Sender<(Key, MetricValue)>,
waker: Arc<Waker>,
}

Expand Down Expand Up @@ -200,67 +180,48 @@ impl TcpBuilder {
poll.registry()
.register(&mut listener, LISTENER, Interest::READABLE)?;

let registry = Arc::new(Registry::new());

let recorder = TcpRecorder {
registry: Arc::clone(&registry),
tx,
waker: Arc::clone(&waker),
};

thread::spawn(move || run_transport(registry, poll, waker, listener, rx, buffer_size));
thread::spawn(move || run_transport(poll, waker, listener, rx, buffer_size));
Ok(recorder)
}
}

impl TcpRecorder {
fn register_metric(&self, kind: MetricKind, key: Key) -> Identifier {
let ckey = CompositeKey(kind, key);
self.registry.get_or_create_identifier(ckey, |k| k.clone())
}

fn push_metric(&self, id: Identifier, value: MetricValue) {
let id = match id {
Identifier::Invalid => return,
Identifier::Valid(_) => id,
};
let _ = self.tx.try_send((id, value));
fn push_metric(&self, key: Key, value: MetricValue) {
let _ = self.tx.try_send((key, value));
let _ = self.waker.wake();
}
}

impl Recorder for TcpRecorder {
fn register_counter(&self, key: Key, _description: Option<&'static str>) -> Identifier {
self.register_metric(MetricKind::Counter, key)
}
fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}

fn register_gauge(&self, key: Key, _description: Option<&'static str>) -> Identifier {
self.register_metric(MetricKind::Gauge, key)
}
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}

fn register_histogram(&self, key: Key, _description: Option<&'static str>) -> Identifier {
self.register_metric(MetricKind::Histogram, key)
}
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}

fn increment_counter(&self, id: Identifier, value: u64) {
self.push_metric(id, MetricValue::Counter(value));
fn increment_counter(&self, key: Key, value: u64) {
self.push_metric(key, MetricValue::Counter(value));
}

fn update_gauge(&self, id: Identifier, value: f64) {
self.push_metric(id, MetricValue::Gauge(value));
fn update_gauge(&self, key: Key, value: f64) {
self.push_metric(key, MetricValue::Gauge(value));
}

fn record_histogram(&self, id: Identifier, value: u64) {
self.push_metric(id, MetricValue::Histogram(value));
fn record_histogram(&self, key: Key, value: u64) {
self.push_metric(key, MetricValue::Histogram(value));
}
}

fn run_transport(
registry: Arc<TcpRegistry>,
mut poll: Poll,
waker: Arc<Waker>,
listener: TcpListener,
rx: Receiver<(Identifier, MetricValue)>,
rx: Receiver<(Key, MetricValue)>,
buffer_size: Option<usize>,
) {
let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
Expand Down Expand Up @@ -309,10 +270,10 @@ fn run_transport(
// If our sender is dead, we can't do anything else, so just return.
Err(_) => return,
};
match convert_metric_to_protobuf_encoded(&registry, msg.0, msg.1) {
Some(Ok(pmsg)) => buffered_pmsgs.push_back(pmsg),
Some(Err(e)) => error!(error = ?e, "error encoding metric"),
None => error!(metric_id = ?msg.0, "unknown metric"),
let (key, value) = msg;
match convert_metric_to_protobuf_encoded(key, value) {
Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
Err(e) => error!(error = ?e, "error encoding metric"),
}
}
drop(_mrxspan);
Expand Down Expand Up @@ -460,46 +421,31 @@ fn drive_connection(
}
}

fn convert_metric_to_protobuf_encoded(
registry: &Arc<TcpRegistry>,
id: Identifier,
value: MetricValue,
) -> Option<Result<Bytes, EncodeError>> {
let id = match id {
Identifier::Invalid => return None,
Identifier::Valid(_) => id,
fn convert_metric_to_protobuf_encoded(key: Key, value: MetricValue) -> Result<Bytes, EncodeError> {
let name = key.name().to_string();
let labels = key
.labels()
.map(|label| (label.key().to_owned(), label.value().to_owned()))
.collect::<BTreeMap<_, _>>();
let mvalue = match value {
MetricValue::Counter(cv) => proto::metric::Value::Counter(proto::Counter { value: cv }),
MetricValue::Gauge(gv) => proto::metric::Value::Gauge(proto::Gauge { value: gv }),
MetricValue::Histogram(hv) => {
proto::metric::Value::Histogram(proto::Histogram { value: hv })
}
};
registry.with_handle(id, |ckey| {
let name = ckey.key().name().to_string();
let labels = ckey
.key()
.labels()
.map(|labels| {
labels
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect::<BTreeMap<_, _>>()
})
.unwrap_or_else(|| BTreeMap::new());
let mvalue = match value {
MetricValue::Counter(cv) => proto::metric::Value::Counter(proto::Counter { value: cv }),
MetricValue::Gauge(gv) => proto::metric::Value::Gauge(proto::Gauge { value: gv }),
MetricValue::Histogram(hv) => {
proto::metric::Value::Histogram(proto::Histogram { value: hv })
}
};

let now: prost_types::Timestamp = SystemTime::now().into();
let metric = proto::Metric {
name,
labels,
timestamp: Some(now),
value: Some(mvalue),
};
let now: prost_types::Timestamp = SystemTime::now().into();
let metric = proto::Metric {
name,
labels,
timestamp: Some(now),
value: Some(mvalue),
};

let mut buf = Vec::new();
metric.encode_length_delimited(&mut buf)?;
Ok(Bytes::from(buf))
})
let mut buf = Vec::new();
metric.encode_length_delimited(&mut buf)?;
Ok(Bytes::from(buf))
}

fn next(current: &mut Token) -> Token {
Expand Down
Loading

0 comments on commit 8bbe1b1

Please sign in to comment.