diff --git a/Cargo.lock b/Cargo.lock index e9979501b8a19..ece69849dea06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,6 +34,15 @@ dependencies = [ "const-random", ] +[[package]] +name = "ahash" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" +dependencies = [ + "const-random", +] + [[package]] name = "aho-corasick" version = "0.7.13" @@ -250,15 +259,6 @@ dependencies = [ "syn 1.0.39", ] -[[package]] -name = "atomic-shim" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20fdac7156779a1a30d970e838195558b4810dd06aa69e7c7461bdc518edf9b" -dependencies = [ - "crossbeam", -] - [[package]] name = "atty" version = "0.2.14" @@ -370,15 +370,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" -[[package]] -name = "bitmaps" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2" -dependencies = [ - "typenum", -] - [[package]] name = "blake2b_simd" version = "0.5.10" @@ -946,20 +937,6 @@ dependencies = [ "itertools 0.9.0", ] -[[package]] -name = "crossbeam" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.4.3" @@ -996,17 +973,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "crossbeam-queue" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "maybe-uninit", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -1088,6 +1054,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "dashmap" +version = "3.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" +dependencies = [ + "ahash 0.3.8", + "cfg-if", + "num_cpus", +] + [[package]] name = "db-key" version = "0.0.5" @@ -1804,7 +1781,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead" dependencies = [ - "ahash", + "ahash 0.2.18", "autocfg 0.1.7", ] @@ -1817,16 +1794,6 @@ dependencies = [ "autocfg 1.0.1", ] -[[package]] -name = "hdrhistogram" -version = "6.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d331ebcdbca4acbefe5da8c3299b2e246f198a8294cc5163354e743398b89d" -dependencies = [ - "byteorder", - "num-traits", -] - [[package]] name = "headers" version = "0.3.2" @@ -2058,20 +2025,6 @@ dependencies = [ "unicode-normalization", ] -[[package]] -name = "im" -version = "15.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "111c1983f3c5bb72732df25cddacee9b546d08325fb584b5ebd38148be7b0246" -dependencies = [ - "bitmaps", - "rand_core 0.5.1", - "rand_xoshiro", - "sized-chunks", - "typenum", - "version_check 0.9.2", -] - [[package]] name = "indexmap" version = "1.5.1" @@ -2675,105 +2628,54 @@ dependencies = [ [[package]] name = "metrics" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b70227ece8711a1aa2f99655efd795d0cff297a5b9fe39645a93aacf6ad39d" -dependencies = [ - "metrics-core", -] - -[[package]] -name = "metrics-core" -version = "0.5.2" +version = "0.13.0-alpha.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c064b3a1ff41f4bf6c91185c8a0caeccf8a8a27e9d0f92cc54cf3dbec812f48" - -[[package]] -name = "metrics-exporter-http" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14017d204ae062dc5c68a321e3dbdcd9b30181305cb6b067932f7f03f754e27" +checksum = "de7e68e5386ca1883535512584aa1f41bd8129fc38f6ce6b252eb4325f0ed5f6" dependencies = [ - "hyper", - "log", - "metrics-core", -] - -[[package]] -name = "metrics-exporter-log" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3fc63816bd5f8bde5eb31ce471f9633adc69ba1c55b44191b4d5fc7e263e8ab" -dependencies = [ - "log", - "metrics-core", - "tokio", -] - -[[package]] -name = "metrics-observer-json" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe930460a6c336b8f873dcfb28da3f805fd0dbadbea7beaf3042c7fb1d9fcd3" -dependencies = [ - "hdrhistogram", - "metrics-core", - "metrics-util", - "serde_json", + "metrics-macros", + "once_cell", + "proc-macro-hack", ] [[package]] -name = "metrics-observer-prometheus" -version = "0.1.4" +name = "metrics-macros" +version = "0.1.0-alpha.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bfe24ad8285ef8b239232135a65f89cc5fa4690bbfaf8907f4bef38f8b08eba" +checksum = "0837df974417cbf0c476c54883e5f42fdaa5fa04cabe6dd6f30f58103c4b9bfc" dependencies = [ - "hdrhistogram", - "metrics-core", - "metrics-util", + "proc-macro-hack", + "proc-macro2 1.0.19", + "quote 1.0.7", + "rustversion", + "syn 1.0.39", ] [[package]] -name = "metrics-observer-yaml" -version = "0.1.1" +name = "metrics-tracing-context" +version = "0.1.0-alpha.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83f66811013592560efc75d75a92d6e2f415a11b52f085e51d9fb4d1edec6335" +checksum = "c67257e8274a6bbd93597f5b8e3bd11d4c1e46fe516f459c0974246845fadb15" dependencies = [ - "hdrhistogram", - "metrics-core", + "metrics", "metrics-util", - "serde_yaml", + "tracing 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing-core 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing-subscriber", ] [[package]] -name = "metrics-runtime" -version = "0.13.1" +name = "metrics-util" +version = "0.4.0-alpha.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce0e4f69639ccc0c6b2f0612164f9817349eb25545ed1ffb5ef3e1e1c1d220b4" +checksum = "b0640e284e8696d7880ccc330c463bbe5af72be8c50e137998297b7989022484" dependencies = [ + "aho-corasick", "arc-swap", - "atomic-shim", + "crossbeam-epoch", "crossbeam-utils", - "im", + "dashmap", "metrics", - "metrics-core", - "metrics-exporter-http", - "metrics-exporter-log", - "metrics-observer-json", - "metrics-observer-prometheus", - "metrics-observer-yaml", - "metrics-util", "parking_lot", - "quanta", -] - -[[package]] -name = "metrics-util" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d11f8090a8886339f9468a04eeea0711e4cf27538b134014664308041307a1c5" -dependencies = [ - "crossbeam-epoch", "serde", ] @@ -3630,18 +3532,6 @@ dependencies = [ "url", ] -[[package]] -name = "quanta" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21484fda3d8ad7affee37755c77a5d0da527543f0af0c7f731c14e2215645d39" -dependencies = [ - "atomic-shim", - "ctor", - "libc", - "winapi 0.3.9", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -3856,15 +3746,6 @@ dependencies = [ "rand_core 0.3.1", ] -[[package]] -name = "rand_xoshiro" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9fcdd2e881d02f1d9390ae47ad8e5696a9e4be7b547a1da2afbc61973217004" -dependencies = [ - "rand_core 0.5.1", -] - [[package]] name = "raw-cpuid" version = "6.1.0" @@ -4308,6 +4189,17 @@ dependencies = [ "security-framework 1.0.0", ] +[[package]] +name = "rustversion" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9bdc5e856e51e685846fb6c13a1f5e5432946c2c90501bdc76a1319f19e29da" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.7", + "syn 1.0.39", +] + [[package]] name = "ryu" version = "1.0.5" @@ -4660,16 +4552,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac" -[[package]] -name = "sized-chunks" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec31ceca5644fa6d444cc77548b88b67f46db6f7c71683b0f9336e671830d2f" -dependencies = [ - "bitmaps", - "typenum", -] - [[package]] name = "slab" version = "0.4.2" @@ -5821,8 +5703,8 @@ dependencies = [ "maxminddb", "md-5", "metrics", - "metrics-core", - "metrics-runtime", + "metrics-tracing-context", + "metrics-util", "nix 0.16.1", "nom 5.1.2", "notify", diff --git a/Cargo.toml b/Cargo.toml index e12769b231a1e..988b4b211fc21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,9 +71,9 @@ tracing-log = "0.1.0" tracing-tower = { git = "https://github.com/tokio-rs/tracing", rev = "f470db1b0354b368f62f9ee4d763595d16373231" } # Metrics -metrics = "0.12.1" -metrics-core = "0.5.2" -metrics-runtime = "0.13.0" +metrics = { version = "0.13.0-alpha" } +metrics-util = { version = "0.4.0-alpha" } +metrics-tracing-context = { version = "0.1.0-alpha" } # Aws rusoto_core = { version = "0.45.0", features = ["encoding"], optional = true } diff --git a/src/event/metric.rs b/src/event/metric.rs index 81fc1c96c23c3..3d2448061b4ac 100644 --- a/src/event/metric.rs +++ b/src/event/metric.rs @@ -1,7 +1,5 @@ use chrono::{DateTime, Utc}; use derive_is_enum_variant::is_enum_variant; -use metrics_core::Key; -use metrics_runtime::Measurement; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{self, Display, Formatter}; @@ -200,16 +198,17 @@ impl Metric { /// Convert the metrics_runtime::Measurement value plus the name and /// labels from a Key into our internal Metric format. - pub fn from_measurement(key: Key, measurement: Measurement) -> Self { - let value = match measurement { - Measurement::Counter(v) => MetricValue::Counter { value: v as f64 }, - Measurement::Gauge(v) => MetricValue::Gauge { value: v as f64 }, - Measurement::Histogram(packed) => { - let values = packed - .decompress() - .into_iter() - .map(|i| i as f64) - .collect::>(); + pub fn from_metric_kv(key: metrics::Key, handle: metrics_util::Handle) -> Self { + let value = match handle { + metrics_util::Handle::Counter(_) => MetricValue::Counter { + value: handle.read_counter() as f64, + }, + metrics_util::Handle::Gauge(_) => MetricValue::Gauge { + value: handle.read_gauge() as f64, + }, + metrics_util::Handle::Histogram(_) => { + let values = handle.read_histogram(); + let values = values.into_iter().map(|i| i as f64).collect::>(); // Each sample in the source measurement has an // effective sample rate of 1, so create an array of // such of the same length as the values. diff --git a/src/internal_events/apache_metrics.rs b/src/internal_events/apache_metrics.rs index 7b5c9a8c726eb..b033fe2b8660d 100644 --- a/src/internal_events/apache_metrics.rs +++ b/src/internal_events/apache_metrics.rs @@ -1,7 +1,7 @@ use super::InternalEvent; use crate::sources::apache_metrics; use http::Uri; -use metrics::{counter, timing}; +use metrics::{counter, histogram}; use std::time::Instant; #[derive(Debug)] @@ -45,7 +45,7 @@ impl InternalEvent for ApacheMetricsRequestCompleted { "component_kind" => "source", "component_type" => "apache_metrics", ); - timing!("request_duration_nanoseconds", self.start, self.end, + histogram!("request_duration_nanoseconds", self.end - self.start, "component_kind" => "source", "component_type" => "apache_metrics", ); diff --git a/src/internal_events/auto_concurrency.rs b/src/internal_events/auto_concurrency.rs index 03410696b3518..411c9ee4b7939 100644 --- a/src/internal_events/auto_concurrency.rs +++ b/src/internal_events/auto_concurrency.rs @@ -1,5 +1,5 @@ use super::InternalEvent; -use metrics::value; +use metrics::histogram; use std::time::Duration; #[derive(Debug)] @@ -24,7 +24,7 @@ impl InternalEvent for AutoConcurrencyLimit { } fn emit_metrics(&self) { - value!("auto_concurrency_limit", self.concurrency); + histogram!("auto_concurrency_limit", self.concurrency); } } @@ -35,7 +35,7 @@ pub struct AutoConcurrencyInFlight { impl InternalEvent for AutoConcurrencyInFlight { fn emit_metrics(&self) { - value!("auto_concurrency_in_flight", self.in_flight); + histogram!("auto_concurrency_in_flight", self.in_flight); } } @@ -46,7 +46,7 @@ pub struct AutoConcurrencyObservedRtt { impl InternalEvent for AutoConcurrencyObservedRtt { fn emit_metrics(&self) { - value!("auto_concurrency_observed_rtt", self.rtt); + histogram!("auto_concurrency_observed_rtt", self.rtt); } } @@ -57,6 +57,6 @@ pub struct AutoConcurrencyAveragedRtt { impl InternalEvent for AutoConcurrencyAveragedRtt { fn emit_metrics(&self) { - value!("auto_concurrency_averaged_rtt", self.rtt); + histogram!("auto_concurrency_averaged_rtt", self.rtt); } } diff --git a/src/internal_events/heartbeat.rs b/src/internal_events/heartbeat.rs index c5c37c5709483..6947d93d94281 100644 --- a/src/internal_events/heartbeat.rs +++ b/src/internal_events/heartbeat.rs @@ -13,6 +13,6 @@ impl InternalEvent for Heartbeat { } fn emit_metrics(&self) { - gauge!("uptime_seconds", self.since.elapsed().as_secs() as i64); + gauge!("uptime_seconds", self.since.elapsed().as_secs() as f64); } } diff --git a/src/internal_events/lua.rs b/src/internal_events/lua.rs index 5997b5b2d5d1e..7488386b9ce7a 100644 --- a/src/internal_events/lua.rs +++ b/src/internal_events/lua.rs @@ -20,7 +20,7 @@ pub struct LuaGcTriggered { impl InternalEvent for LuaGcTriggered { fn emit_metrics(&self) { - gauge!("memory_used", self.used_memory as i64, + gauge!("memory_used", self.used_memory as f64, "component_kind" => "transform", "component_type" => "lua", ); diff --git a/src/internal_events/prometheus.rs b/src/internal_events/prometheus.rs index 5d66d1379e6a2..5b55ae5610ed2 100644 --- a/src/internal_events/prometheus.rs +++ b/src/internal_events/prometheus.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use crate::sources::prometheus::parser::ParserError; -use metrics::{counter, timing}; +use metrics::{counter, histogram}; use std::borrow::Cow; use std::time::Instant; @@ -45,7 +45,7 @@ impl InternalEvent for PrometheusRequestCompleted { "component_kind" => "source", "component_type" => "prometheus", ); - timing!("request_duration_nanoseconds", self.start, self.end, + histogram!("request_duration_nanoseconds", self.end - self.start, "component_kind" => "source", "component_type" => "prometheus", ); diff --git a/src/kubernetes/state/instrumenting.rs b/src/kubernetes/state/instrumenting.rs index bc2a795cf30c8..e5af68e47e8d4 100644 --- a/src/kubernetes/state/instrumenting.rs +++ b/src/kubernetes/state/instrumenting.rs @@ -67,7 +67,7 @@ where mod tests { use super::super::{mock, MaintainedWrite, Write}; use super::*; - use crate::test_util::trace_init; + use crate::{event::metric::MetricValue, test_util::trace_init}; use futures::{channel::mpsc, SinkExt, StreamExt}; use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta}; use once_cell::sync::OnceCell; @@ -96,50 +96,41 @@ mod tests { } } - fn get_metric_value(op_kind: &'static str) -> Option { - let controller = crate::metrics::CONTROLLER.get().unwrap_or_else(|| { - crate::metrics::init().unwrap(); - crate::metrics::CONTROLLER - .get() - .expect("failed to init metric container") - }); - - let key = metrics_core::Key::from_name_and_labels( - "k8s_state_ops", - vec![metrics_core::Label::new("op_kind", op_kind)], + fn get_metric_value(op_kind: &'static str) -> Option { + let controller = crate::metrics::get_controller().expect("failed to init metric container"); + + let tags_to_lookup = Some( + vec![("op_kind".to_owned(), op_kind.to_owned())] + .into_iter() + .collect(), ); - controller - .snapshot() - .into_measurements() - .into_iter() - .find_map(|(candidate_key, measurement)| { - if candidate_key == key { - Some(measurement) - } else { - None - } + + crate::metrics::capture_metrics(controller) + .find(|event| { + let metric = event.as_metric(); + metric.name == "k8s_state_ops" && metric.tags == tags_to_lookup }) + .map(|event| event.into_metric().value) } fn assert_counter_changed( - before: Option, - after: Option, + before: Option, + after: Option, expected_difference: u64, ) { - let before = before.unwrap_or_else(|| metrics_runtime::Measurement::Counter(0)); - let after = after.unwrap_or_else(|| metrics_runtime::Measurement::Counter(0)); + let before = before.unwrap_or_else(|| MetricValue::Counter { value: 0.0 }); + let after = after.unwrap_or_else(|| MetricValue::Counter { value: 0.0 }); let (before, after) = match (before, after) { - ( - metrics_runtime::Measurement::Counter(before), - metrics_runtime::Measurement::Counter(after), - ) => (before, after), + (MetricValue::Counter { value: before }, MetricValue::Counter { value: after }) => { + (before, after) + } _ => panic!("Metrics kind mismatch"), }; let difference = after - before; - assert_eq!(difference, expected_difference); + assert_eq!(difference, expected_difference as f64); } /// Guarantees only one test will run at a time. @@ -147,26 +138,16 @@ mod tests { /// want interference. fn tests_lock() -> MutexGuard<'static, ()> { static INSTANCE: OnceCell> = OnceCell::new(); - INSTANCE.get_or_init(|| Mutex::new(())).lock().unwrap() + INSTANCE + .get_or_init(|| Mutex::new(())) + .lock() + .unwrap_or_else(|err| err.into_inner()) } - // TODO: tests here are ignored because they cause interference with - // the metrics tests. - // There is no way to assert individual emits, and asserting metrics - // directly causes issues: - // - these tests break the internal tests at the metrics implementation - // itself, since we end up initializing the metrics controller twice; - // - testing metrics introduces unintended coupling between subsystems, - // ideally we only need to assert that we emit, but avoid assumptions on - // what the results of that emit are. - // Un-ignore them and/or properly reimplement once the issues above are - // resolved. - - #[ignore] #[tokio::test] async fn add() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -194,11 +175,10 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn update() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -226,11 +206,10 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn delete() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -258,11 +237,10 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn resync() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -286,11 +264,10 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn request_maintenance_without_maintenance() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, _events_rx, _actions_tx) = prepare_test(); @@ -300,11 +277,10 @@ mod tests { assert_counter_changed(before, after, 0); } - #[ignore] #[tokio::test] async fn request_maintenance_with_maintenance() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (events_tx, _events_rx) = mpsc::channel(0); @@ -324,11 +300,10 @@ mod tests { assert_counter_changed(before, after, 1); } - #[ignore] #[tokio::test] async fn perform_maintenance() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); diff --git a/src/metrics.rs b/src/metrics.rs index 382022ffe3daf..32b3c52b5107e 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,34 +1,169 @@ -use crate::event::{Event, Metric}; -use metrics_runtime::{Controller, Receiver}; +use crate::{event::Metric, Event}; +use metrics::{Key, Label, Recorder}; +use metrics_tracing_context::{LabelFilter, TracingContextLayer}; +use metrics_util::layers::Layer; +use metrics_util::{CompositeKey, Handle, MetricKind, Registry}; use once_cell::sync::OnceCell; +use std::sync::Arc; -pub static CONTROLLER: OnceCell = OnceCell::new(); +static CONTROLLER: OnceCell = OnceCell::new(); pub fn init() -> crate::Result<()> { - let receiver = Receiver::builder() - .build() - .expect("failed to create receiver"); + // Prepare the registry. + let registry = Registry::new(); + let registry = Arc::new(registry); + // Initialize the controller. + let controller = Controller { + registry: Arc::clone(®istry), + }; + // Register the controller globally. CONTROLLER - .set(receiver.controller()) - .map_err(|_| "failed to set receiver. metrics system already initialized.")?; + .set(controller) + .map_err(|_| "controller already initialized")?; - receiver.install(); + // Initialize the recorder. + let recorder = VectorRecorder { + registry: Arc::clone(®istry), + }; + // Apply a layer to capture tracing span fields as labels. + let recorder = TracingContextLayer::new(VectorLabelFilter).layer(recorder); + // Register the recorder globally. + metrics::set_boxed_recorder(Box::new(recorder)).map_err(|_| "recorder already initialized")?; + // Done. Ok(()) } -pub fn get_controller() -> crate::Result { +/// [`VectorRecorder`] is a [`metrics::Recorder`] implementation that's suitable +/// for the advanced usage that we have in Vector. +struct VectorRecorder { + registry: Arc>, +} + +impl Recorder for VectorRecorder { + fn register_counter(&self, key: Key, _description: Option<&'static str>) { + let ckey = CompositeKey::new(MetricKind::Counter, key); + self.registry.op(ckey, |_| {}, Handle::counter) + } + fn register_gauge(&self, key: Key, _description: Option<&'static str>) { + let ckey = CompositeKey::new(MetricKind::Gauge, key); + self.registry.op(ckey, |_| {}, Handle::gauge) + } + fn register_histogram(&self, key: Key, _description: Option<&'static str>) { + let ckey = CompositeKey::new(MetricKind::Histogram, key); + self.registry.op(ckey, |_| {}, Handle::histogram) + } + + fn increment_counter(&self, key: Key, value: u64) { + let ckey = CompositeKey::new(MetricKind::Counter, key); + self.registry.op( + ckey, + |handle| handle.increment_counter(value), + Handle::counter, + ) + } + fn update_gauge(&self, key: Key, value: f64) { + let ckey = CompositeKey::new(MetricKind::Gauge, key); + self.registry + .op(ckey, |handle| handle.update_gauge(value), Handle::gauge) + } + fn record_histogram(&self, key: Key, value: u64) { + let ckey = CompositeKey::new(MetricKind::Histogram, key); + self.registry.op( + ckey, + |handle| handle.record_histogram(value), + Handle::histogram, + ) + } +} + +#[derive(Debug, Clone)] +struct VectorLabelFilter; + +impl LabelFilter for VectorLabelFilter { + fn should_include_label(&self, label: &Label) -> bool { + let key = label.key(); + key == "topology_component_name" || key == "topology_component_type" + } +} + +/// Controller allows capturing metric snapshots. +pub struct Controller { + registry: Arc>, +} + +/// Get a handle to the globally registered controller, if it's initialized. +pub fn get_controller() -> crate::Result<&'static Controller> { CONTROLLER .get() - .cloned() .ok_or_else(|| "metrics system not initialized".into()) } -pub fn capture_metrics(controller: &Controller) -> impl Iterator { - controller - .snapshot() - .into_measurements() +fn snapshot(controller: &Controller) -> Vec { + let handles = controller.registry.get_handles(); + handles .into_iter() - .map(|(k, m)| Metric::from_measurement(k, m).into()) + .map(|(ck, m)| { + let (_, k) = ck.into_parts(); + Metric::from_metric_kv(k, m).into() + }) + .collect() +} + +/// Take a snapshot of all gathered metrics and expose them as metric +/// [`Event`]s. +pub fn capture_metrics(controller: &Controller) -> impl Iterator { + snapshot(controller).into_iter() +} + +#[cfg(test)] +mod tests { + use crate::test_util::trace_init; + use metrics::counter; + use tracing::{span, Level}; + + #[ignore] + #[test] + fn test_labels_injection() { + trace_init(); + let _ = super::init(); + + let span = span!( + Level::ERROR, + "my span", + topology_component_name = "my_component_name", + topology_component_type = "my_component_type", + some_other_label = "qwerty" + ); + // See https://github.com/tokio-rs/tracing/issues/978 + if span.is_disabled() { + panic!("test is not configured properly, set TEST_LOG=info env var") + } + let _enter = span.enter(); + + counter!("labels_injected", 1); + + let metric = super::capture_metrics(super::get_controller().unwrap()) + .map(|e| e.into_metric()) + .find(|metric| metric.name == "labels_injected") + .unwrap(); + + let expected_tags = Some( + vec![ + ( + "topology_component_name".to_owned(), + "my_component_name".to_owned(), + ), + ( + "topology_component_type".to_owned(), + "my_component_type".to_owned(), + ), + ] + .into_iter() + .collect(), + ); + + assert_eq!(metric.tags, expected_tags); + } } diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index cb2b733354af0..9e9449fef1923 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -1,5 +1,6 @@ use crate::{ config::{DataType, GlobalOptions, SourceConfig, SourceDescription}, + metrics::Controller, metrics::{capture_metrics, get_controller}, shutdown::ShutdownSignal, Pipeline, @@ -10,7 +11,6 @@ use futures::{ stream::StreamExt, }; use futures01::Sink; -use metrics_runtime::Controller; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::{select, time::interval}; @@ -45,7 +45,7 @@ impl SourceConfig for InternalMetricsConfig { } async fn run( - controller: Controller, + controller: &Controller, mut out: Pipeline, shutdown: ShutdownSignal, ) -> Result<(), ()> { @@ -60,7 +60,7 @@ async fn run( else => false, }; - let metrics = capture_metrics(&controller); + let metrics = capture_metrics(controller); let (sink, _) = out .send_all(futures01::stream::iter_ok(metrics)) @@ -77,7 +77,7 @@ async fn run( mod tests { use crate::event::metric::{Metric, MetricValue, StatisticKind}; use crate::metrics::{capture_metrics, get_controller}; - use metrics::{counter, gauge, timing, value}; + use metrics::{counter, gauge, histogram}; use std::collections::BTreeMap; #[test] @@ -87,14 +87,14 @@ mod tests { // There *seems* to be a race condition here (CI was flaky), so add a slight delay. std::thread::sleep(std::time::Duration::from_millis(300)); - gauge!("foo", 1); - gauge!("foo", 2); + gauge!("foo", 1.0); + gauge!("foo", 2.0); counter!("bar", 3); counter!("bar", 4); - timing!("baz", 5); - timing!("baz", 6); - value!("quux", 7, "host" => "foo"); - value!("quux", 8, "host" => "foo"); + histogram!("baz", 5); + histogram!("baz", 6); + histogram!("quux", 7, "host" => "foo"); + histogram!("quux", 8, "host" => "foo"); let controller = get_controller().expect("no controller"); diff --git a/src/topology/mod.rs b/src/topology/mod.rs index 516f812f7bf85..89f13e105015c 100644 --- a/src/topology/mod.rs +++ b/src/topology/mod.rs @@ -436,7 +436,11 @@ impl RunningTopology { fn spawn_sink(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!("sink", name = %task.name(), r#type = %task.typetag()); + let span = error_span!( + "sink", + topology_component_name = %task.name(), + topology_component_type = %task.typetag(), + ); let task = handle_errors(task.compat(), self.abort_tx.clone()).instrument(span); let spawned = tokio::spawn(task.compat()); if let Some(previous) = self.tasks.insert(name.to_string(), spawned) { @@ -446,7 +450,11 @@ impl RunningTopology { fn spawn_transform(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!("transform", name = %task.name(), r#type = %task.typetag()); + let span = error_span!( + "transform", + topology_component_name = %task.name(), + topology_component_type = %task.typetag(), + ); let task = handle_errors(task.compat(), self.abort_tx.clone()).instrument(span); let spawned = tokio::spawn(task.compat()); if let Some(previous) = self.tasks.insert(name.to_string(), spawned) { @@ -456,7 +464,11 @@ impl RunningTopology { fn spawn_source(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!("source", name = %task.name(), r#type = %task.typetag()); + let span = error_span!( + "source", + topology_component_name = %task.name(), + topology_component_type = %task.typetag(), + ); let task = handle_errors(task.compat(), self.abort_tx.clone()).instrument(span.clone()); let spawned = tokio::spawn(task.compat()); if let Some(previous) = self.tasks.insert(name.to_string(), spawned) { diff --git a/src/trace.rs b/src/trace.rs index f0c0e07d81ed0..5927a6eebf1e2 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -1,3 +1,4 @@ +use metrics_tracing_context::MetricsLayer; use tracing::{ dispatcher::{set_global_default, Dispatch}, span::Span, @@ -16,7 +17,8 @@ pub fn init(color: bool, json: bool, levels: &str) { .json() .flatten_event(true) .finish() - .with(Limit::default()); + .with(Limit::default()) + .with(MetricsLayer::new()); Dispatch::new(subscriber) } else { @@ -24,7 +26,8 @@ pub fn init(color: bool, json: bool, levels: &str) { .with_ansi(color) .with_env_filter(levels) .finish() - .with(Limit::default()); + .with(Limit::default()) + .with(MetricsLayer::new()); Dispatch::new(subscriber) };