From 9a8a98181f8efe2e819f3cda30870b3e73bf4a3b Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 15 Sep 2020 19:45:19 +0300 Subject: [PATCH 01/13] Update the metrics crate to 0.13.0-alpha Signed-off-by: MOZGIII --- Cargo.lock | 239 ++++++------------------ Cargo.toml | 5 +- src/event/metric.rs | 23 ++- src/internal_events/auto_concurrency.rs | 10 +- src/internal_events/heartbeat.rs | 2 +- src/internal_events/lua.rs | 2 +- src/internal_events/prometheus.rs | 4 +- src/kubernetes/state/instrumenting.rs | 76 +++----- src/metrics.rs | 89 +++++++-- src/sources/internal_metrics.rs | 20 +- 10 files changed, 183 insertions(+), 287 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2512cfc1ccdd8..c7b53f065a27f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,15 @@ dependencies = [ "const-random", ] +[[package]] +name = "ahash" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" +dependencies = [ + "const-random", +] + [[package]] name = "aho-corasick" version = "0.7.13" @@ -148,15 +157,6 @@ dependencies = [ "syn 1.0.39", ] -[[package]] -name = "atomic-shim" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20fdac7156779a1a30d970e838195558b4810dd06aa69e7c7461bdc518edf9b" -dependencies = [ - "crossbeam", -] - [[package]] name = "atty" version = "0.2.14" @@ -277,15 +277,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" -[[package]] -name = "bitmaps" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2" -dependencies = [ - "typenum", -] - [[package]] name = "blake2b_simd" version = "0.5.10" @@ -804,20 +795,6 @@ dependencies = [ "itertools 0.9.0", ] -[[package]] -name = "crossbeam" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.4.3" @@ -854,17 +831,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "crossbeam-queue" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "maybe-uninit", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -946,6 +912,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "dashmap" +version = "3.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" +dependencies = [ + "ahash 0.3.8", + "cfg-if", + "num_cpus", +] + [[package]] name = "db-key" version = "0.0.5" @@ -1590,7 +1567,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead" dependencies = [ - "ahash", + "ahash 0.2.18", "autocfg 0.1.7", ] @@ -1603,16 +1580,6 @@ dependencies = [ "autocfg 1.0.1", ] -[[package]] -name = "hdrhistogram" -version = "6.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d331ebcdbca4acbefe5da8c3299b2e246f198a8294cc5163354e743398b89d" -dependencies = [ - "byteorder", - "num-traits", -] - [[package]] name = "headers" version = "0.3.2" @@ -1843,20 +1810,6 @@ dependencies = [ "unicode-normalization", ] -[[package]] -name = "im" -version = "15.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "111c1983f3c5bb72732df25cddacee9b546d08325fb584b5ebd38148be7b0246" -dependencies = [ - "bitmaps", - "rand_core 0.5.1", - "rand_xoshiro", - "sized-chunks", - "typenum", - "version_check 0.9.2", -] - [[package]] name = "indexmap" version = "1.5.1" @@ -2442,105 +2395,38 @@ dependencies = [ [[package]] name = "metrics" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b70227ece8711a1aa2f99655efd795d0cff297a5b9fe39645a93aacf6ad39d" -dependencies = [ - "metrics-core", -] - -[[package]] -name = "metrics-core" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c064b3a1ff41f4bf6c91185c8a0caeccf8a8a27e9d0f92cc54cf3dbec812f48" - -[[package]] -name = "metrics-exporter-http" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14017d204ae062dc5c68a321e3dbdcd9b30181305cb6b067932f7f03f754e27" -dependencies = [ - "hyper", - "log", - "metrics-core", -] - -[[package]] -name = "metrics-exporter-log" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3fc63816bd5f8bde5eb31ce471f9633adc69ba1c55b44191b4d5fc7e263e8ab" -dependencies = [ - "log", - "metrics-core", - "tokio", -] - -[[package]] -name = "metrics-observer-json" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe930460a6c336b8f873dcfb28da3f805fd0dbadbea7beaf3042c7fb1d9fcd3" +version = "0.13.0-alpha.3" +source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" dependencies = [ - "hdrhistogram", - "metrics-core", - "metrics-util", - "serde_json", -] - -[[package]] -name = "metrics-observer-prometheus" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bfe24ad8285ef8b239232135a65f89cc5fa4690bbfaf8907f4bef38f8b08eba" -dependencies = [ - "hdrhistogram", - "metrics-core", - "metrics-util", + "metrics-macros", + "once_cell", + "proc-macro-hack", ] [[package]] -name = "metrics-observer-yaml" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83f66811013592560efc75d75a92d6e2f415a11b52f085e51d9fb4d1edec6335" +name = "metrics-macros" +version = "0.1.0-alpha.2" +source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" dependencies = [ - "hdrhistogram", - "metrics-core", - "metrics-util", - "serde_yaml", + "proc-macro-hack", + "proc-macro2 1.0.19", + "quote 1.0.7", + "rustversion", + "syn 1.0.39", ] [[package]] -name = "metrics-runtime" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce0e4f69639ccc0c6b2f0612164f9817349eb25545ed1ffb5ef3e1e1c1d220b4" +name = "metrics-util" +version = "0.4.0-alpha.2" +source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" dependencies = [ + "aho-corasick", "arc-swap", - "atomic-shim", + "crossbeam-epoch", "crossbeam-utils", - "im", + "dashmap", "metrics", - "metrics-core", - "metrics-exporter-http", - "metrics-exporter-log", - "metrics-observer-json", - "metrics-observer-prometheus", - "metrics-observer-yaml", - "metrics-util", "parking_lot 0.10.2", - "quanta", -] - -[[package]] -name = "metrics-util" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d11f8090a8886339f9468a04eeea0711e4cf27538b134014664308041307a1c5" -dependencies = [ - "crossbeam-epoch", "serde", ] @@ -3377,18 +3263,6 @@ dependencies = [ "url", ] -[[package]] -name = "quanta" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21484fda3d8ad7affee37755c77a5d0da527543f0af0c7f731c14e2215645d39" -dependencies = [ - "atomic-shim", - "ctor", - "libc", - "winapi 0.3.9", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -3603,15 +3477,6 @@ dependencies = [ "rand_core 0.3.1", ] -[[package]] -name = "rand_xoshiro" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9fcdd2e881d02f1d9390ae47ad8e5696a9e4be7b547a1da2afbc61973217004" -dependencies = [ - "rand_core 0.5.1", -] - [[package]] name = "raw-cpuid" version = "6.1.0" @@ -4055,6 +3920,17 @@ dependencies = [ "security-framework 1.0.0", ] +[[package]] +name = "rustversion" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9bdc5e856e51e685846fb6c13a1f5e5432946c2c90501bdc76a1319f19e29da" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.7", + "syn 1.0.39", +] + [[package]] name = "ryu" version = "1.0.5" @@ -4387,16 +4263,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac" -[[package]] -name = "sized-chunks" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec31ceca5644fa6d444cc77548b88b67f46db6f7c71683b0f9336e671830d2f" -dependencies = [ - "bitmaps", - "typenum", -] - [[package]] name = "slab" version = "0.4.2" @@ -5514,8 +5380,7 @@ dependencies = [ "matches", "maxminddb", "metrics", - "metrics-core", - "metrics-runtime", + "metrics-util", "nix 0.16.1", "nom 5.1.2", "notify", diff --git a/Cargo.toml b/Cargo.toml index 945541c39d477..d0ecc025f2516 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,9 +66,8 @@ tracing-log = "0.1.0" tracing-tower = { git = "https://github.com/tokio-rs/tracing", rev = "f470db1b0354b368f62f9ee4d763595d16373231" } # Metrics -metrics = "0.12.1" -metrics-core = "0.5.2" -metrics-runtime = "0.13.0" +metrics = { version = "0.13.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } +metrics-util = { version = "0.4.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } # Aws rusoto_core = { version = "0.45.0", features = ["encoding"], optional = true } diff --git a/src/event/metric.rs b/src/event/metric.rs index f53ffb9851238..180fca1fdf400 100644 --- a/src/event/metric.rs +++ b/src/event/metric.rs @@ -1,7 +1,5 @@ use chrono::{DateTime, Utc}; use derive_is_enum_variant::is_enum_variant; -use metrics_core::Key; -use metrics_runtime::Measurement; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{self, Display, Formatter}; @@ -178,16 +176,17 @@ impl Metric { } } - pub fn from_measurement(key: Key, measurement: Measurement) -> Self { - let value = match measurement { - Measurement::Counter(v) => MetricValue::Counter { value: v as f64 }, - Measurement::Gauge(v) => MetricValue::Gauge { value: v as f64 }, - Measurement::Histogram(packed) => { - let values = packed - .decompress() - .into_iter() - .map(|i| i as f64) - .collect::>(); + pub fn from_metric_kv(key: metrics::Key, handle: metrics_util::Handle) -> Self { + let value = match handle { + metrics_util::Handle::Counter(_) => MetricValue::Counter { + value: handle.read_counter() as f64, + }, + metrics_util::Handle::Gauge(_) => MetricValue::Gauge { + value: handle.read_gauge() as f64, + }, + metrics_util::Handle::Histogram(_) => { + let values = handle.read_histogram(); + let values = values.into_iter().map(|i| i as f64).collect::>(); let sample_rates = vec![1; values.len()]; MetricValue::Distribution { values, diff --git a/src/internal_events/auto_concurrency.rs b/src/internal_events/auto_concurrency.rs index 03410696b3518..411c9ee4b7939 100644 --- a/src/internal_events/auto_concurrency.rs +++ b/src/internal_events/auto_concurrency.rs @@ -1,5 +1,5 @@ use super::InternalEvent; -use metrics::value; +use metrics::histogram; use std::time::Duration; #[derive(Debug)] @@ -24,7 +24,7 @@ impl InternalEvent for AutoConcurrencyLimit { } fn emit_metrics(&self) { - value!("auto_concurrency_limit", self.concurrency); + histogram!("auto_concurrency_limit", self.concurrency); } } @@ -35,7 +35,7 @@ pub struct AutoConcurrencyInFlight { impl InternalEvent for AutoConcurrencyInFlight { fn emit_metrics(&self) { - value!("auto_concurrency_in_flight", self.in_flight); + histogram!("auto_concurrency_in_flight", self.in_flight); } } @@ -46,7 +46,7 @@ pub struct AutoConcurrencyObservedRtt { impl InternalEvent for AutoConcurrencyObservedRtt { fn emit_metrics(&self) { - value!("auto_concurrency_observed_rtt", self.rtt); + histogram!("auto_concurrency_observed_rtt", self.rtt); } } @@ -57,6 +57,6 @@ pub struct AutoConcurrencyAveragedRtt { impl InternalEvent for AutoConcurrencyAveragedRtt { fn emit_metrics(&self) { - value!("auto_concurrency_averaged_rtt", self.rtt); + histogram!("auto_concurrency_averaged_rtt", self.rtt); } } diff --git a/src/internal_events/heartbeat.rs b/src/internal_events/heartbeat.rs index 8086bc10d51ae..386fc59c30d36 100644 --- a/src/internal_events/heartbeat.rs +++ b/src/internal_events/heartbeat.rs @@ -13,6 +13,6 @@ impl InternalEvent for Heartbeat { } fn emit_metrics(&self) { - gauge!("uptime_seconds", self.since.elapsed().as_secs() as i64); + gauge!("uptime_seconds", self.since.elapsed().as_secs() as f64); } } diff --git a/src/internal_events/lua.rs b/src/internal_events/lua.rs index 5997b5b2d5d1e..7488386b9ce7a 100644 --- a/src/internal_events/lua.rs +++ b/src/internal_events/lua.rs @@ -20,7 +20,7 @@ pub struct LuaGcTriggered { impl InternalEvent for LuaGcTriggered { fn emit_metrics(&self) { - gauge!("memory_used", self.used_memory as i64, + gauge!("memory_used", self.used_memory as f64, "component_kind" => "transform", "component_type" => "lua", ); diff --git a/src/internal_events/prometheus.rs b/src/internal_events/prometheus.rs index 68e5e9cecfd45..80f0d3dcbdc57 100644 --- a/src/internal_events/prometheus.rs +++ b/src/internal_events/prometheus.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use crate::sources::prometheus::parser::ParserError; -use metrics::{counter, timing}; +use metrics::{counter, histogram}; use std::borrow::Cow; use std::time::Instant; @@ -45,7 +45,7 @@ impl InternalEvent for PrometheusRequestCompleted { "component_kind" => "source", "component_type" => "prometheus", ); - timing!("request_duration_nanoseconds", self.start, self.end, + histogram!("request_duration_nanoseconds", self.end - self.start, "component_kind" => "source", "component_type" => "prometheus", ); diff --git a/src/kubernetes/state/instrumenting.rs b/src/kubernetes/state/instrumenting.rs index bc2a795cf30c8..4b975b7cf1ade 100644 --- a/src/kubernetes/state/instrumenting.rs +++ b/src/kubernetes/state/instrumenting.rs @@ -67,7 +67,7 @@ where mod tests { use super::super::{mock, MaintainedWrite, Write}; use super::*; - use crate::test_util::trace_init; + use crate::{event::metric::MetricValue, test_util::trace_init}; use futures::{channel::mpsc, SinkExt, StreamExt}; use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta}; use once_cell::sync::OnceCell; @@ -96,50 +96,44 @@ mod tests { } } - fn get_metric_value(op_kind: &'static str) -> Option { - let controller = crate::metrics::CONTROLLER.get().unwrap_or_else(|| { - crate::metrics::init().unwrap(); - crate::metrics::CONTROLLER - .get() - .expect("failed to init metric container") + fn get_metric_value(op_kind: &'static str) -> Option { + let controller = crate::metrics::get_controller().unwrap_or_else(|_| { + crate::metrics::init().expect("unable to init metrics"); + crate::metrics::get_controller().expect("failed to init metric container") }); - let key = metrics_core::Key::from_name_and_labels( - "k8s_state_ops", - vec![metrics_core::Label::new("op_kind", op_kind)], + let tags_to_lookup = Some( + vec![("op_kind".to_owned(), op_kind.to_owned())] + .into_iter() + .collect(), ); - controller - .snapshot() - .into_measurements() - .into_iter() - .find_map(|(candidate_key, measurement)| { - if candidate_key == key { - Some(measurement) - } else { - None - } + + crate::metrics::capture_metrics(controller) + .find(|event| { + let metric = event.as_metric(); + metric.name == "k8s_state_ops" && metric.tags == tags_to_lookup }) + .map(|event| event.into_metric().value) } fn assert_counter_changed( - before: Option, - after: Option, + before: Option, + after: Option, expected_difference: u64, ) { - let before = before.unwrap_or_else(|| metrics_runtime::Measurement::Counter(0)); - let after = after.unwrap_or_else(|| metrics_runtime::Measurement::Counter(0)); + let before = before.unwrap_or_else(|| MetricValue::Counter { value: 0.0 }); + let after = after.unwrap_or_else(|| MetricValue::Counter { value: 0.0 }); let (before, after) = match (before, after) { - ( - metrics_runtime::Measurement::Counter(before), - metrics_runtime::Measurement::Counter(after), - ) => (before, after), + (MetricValue::Counter { value: before }, MetricValue::Counter { value: after }) => { + (before, after) + } _ => panic!("Metrics kind mismatch"), }; let difference = after - before; - assert_eq!(difference, expected_difference); + assert_eq!(difference, expected_difference as f64); } /// Guarantees only one test will run at a time. @@ -147,22 +141,12 @@ mod tests { /// want interference. fn tests_lock() -> MutexGuard<'static, ()> { static INSTANCE: OnceCell> = OnceCell::new(); - INSTANCE.get_or_init(|| Mutex::new(())).lock().unwrap() + INSTANCE + .get_or_init(|| Mutex::new(())) + .lock() + .unwrap_or_else(|err| err.into_inner()) } - // TODO: tests here are ignored because they cause interference with - // the metrics tests. - // There is no way to assert individual emits, and asserting metrics - // directly causes issues: - // - these tests break the internal tests at the metrics implementation - // itself, since we end up initializing the metrics controller twice; - // - testing metrics introduces unintended coupling between subsystems, - // ideally we only need to assert that we emit, but avoid assumptions on - // what the results of that emit are. - // Un-ignore them and/or properly reimplement once the issues above are - // resolved. - - #[ignore] #[tokio::test] async fn add() { trace_init(); @@ -194,7 +178,6 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn update() { trace_init(); @@ -226,7 +209,6 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn delete() { trace_init(); @@ -258,7 +240,6 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn resync() { trace_init(); @@ -286,7 +267,6 @@ mod tests { join.await.unwrap(); } - #[ignore] #[tokio::test] async fn request_maintenance_without_maintenance() { trace_init(); @@ -300,7 +280,6 @@ mod tests { assert_counter_changed(before, after, 0); } - #[ignore] #[tokio::test] async fn request_maintenance_with_maintenance() { trace_init(); @@ -324,7 +303,6 @@ mod tests { assert_counter_changed(before, after, 1); } - #[ignore] #[tokio::test] async fn perform_maintenance() { trace_init(); diff --git a/src/metrics.rs b/src/metrics.rs index 382022ffe3daf..d73d0bad2fb61 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,34 +1,89 @@ -use crate::event::{Event, Metric}; -use metrics_runtime::{Controller, Receiver}; +use crate::{event::Metric, Event}; +use metrics::{Key, Recorder}; +use metrics_util::{CompositeKey, Handle, MetricKind, Registry}; use once_cell::sync::OnceCell; -pub static CONTROLLER: OnceCell = OnceCell::new(); +static CONTROLLER: OnceCell = OnceCell::new(); pub fn init() -> crate::Result<()> { - let receiver = Receiver::builder() - .build() - .expect("failed to create receiver"); - CONTROLLER - .set(receiver.controller()) - .map_err(|_| "failed to set receiver. metrics system already initialized.")?; + .set(Controller::new()) + .map_err(|_| "controller already initialized")?; - receiver.install(); + metrics::set_recorder(CONTROLLER.get().unwrap()).map_err(|_| "recorder already initialized")?; Ok(()) } -pub fn get_controller() -> crate::Result { +pub struct Controller { + registry: Registry, +} + +impl Controller { + pub fn new() -> Self { + Self { + registry: Registry::new(), + } + } +} + +impl Recorder for Controller { + fn register_counter(&self, key: Key, _description: Option<&'static str>) { + let ckey = CompositeKey::new(MetricKind::Counter, key); + self.registry.op(ckey, |_| {}, || Handle::counter()) + } + fn register_gauge(&self, key: Key, _description: Option<&'static str>) { + let ckey = CompositeKey::new(MetricKind::Gauge, key); + self.registry.op(ckey, |_| {}, || Handle::gauge()) + } + fn register_histogram(&self, key: Key, _description: Option<&'static str>) { + let ckey = CompositeKey::new(MetricKind::Histogram, key); + self.registry.op(ckey, |_| {}, || Handle::histogram()) + } + + fn increment_counter(&self, key: Key, value: u64) { + let ckey = CompositeKey::new(MetricKind::Counter, key); + self.registry.op( + ckey, + |handle| handle.increment_counter(value), + || Handle::counter(), + ) + } + fn update_gauge(&self, key: Key, value: f64) { + let ckey = CompositeKey::new(MetricKind::Gauge, key); + self.registry.op( + ckey, + |handle| handle.update_gauge(value), + || Handle::gauge(), + ) + } + fn record_histogram(&self, key: Key, value: u64) { + let ckey = CompositeKey::new(MetricKind::Histogram, key); + self.registry.op( + ckey, + |handle| handle.record_histogram(value), + || Handle::histogram(), + ) + } +} + +pub fn get_controller() -> crate::Result<&'static Controller> { CONTROLLER .get() - .cloned() .ok_or_else(|| "metrics system not initialized".into()) } -pub fn capture_metrics(controller: &Controller) -> impl Iterator { - controller - .snapshot() - .into_measurements() +pub fn snapshot(controller: &Controller) -> Vec { + let handles = controller.registry.get_handles(); + handles .into_iter() - .map(|(k, m)| Metric::from_measurement(k, m).into()) + .map(|(ck, m)| { + let (_, k) = ck.into_parts(); + Metric::from_metric_kv(k, m).into() + }) + .collect() +} + +pub fn capture_metrics(controller: &Controller) -> impl Iterator { + snapshot(controller).into_iter() } diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index cb2b733354af0..9e9449fef1923 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -1,5 +1,6 @@ use crate::{ config::{DataType, GlobalOptions, SourceConfig, SourceDescription}, + metrics::Controller, metrics::{capture_metrics, get_controller}, shutdown::ShutdownSignal, Pipeline, @@ -10,7 +11,6 @@ use futures::{ stream::StreamExt, }; use futures01::Sink; -use metrics_runtime::Controller; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::{select, time::interval}; @@ -45,7 +45,7 @@ impl SourceConfig for InternalMetricsConfig { } async fn run( - controller: Controller, + controller: &Controller, mut out: Pipeline, shutdown: ShutdownSignal, ) -> Result<(), ()> { @@ -60,7 +60,7 @@ async fn run( else => false, }; - let metrics = capture_metrics(&controller); + let metrics = capture_metrics(controller); let (sink, _) = out .send_all(futures01::stream::iter_ok(metrics)) @@ -77,7 +77,7 @@ async fn run( mod tests { use crate::event::metric::{Metric, MetricValue, StatisticKind}; use crate::metrics::{capture_metrics, get_controller}; - use metrics::{counter, gauge, timing, value}; + use metrics::{counter, gauge, histogram}; use std::collections::BTreeMap; #[test] @@ -87,14 +87,14 @@ mod tests { // There *seems* to be a race condition here (CI was flaky), so add a slight delay. std::thread::sleep(std::time::Duration::from_millis(300)); - gauge!("foo", 1); - gauge!("foo", 2); + gauge!("foo", 1.0); + gauge!("foo", 2.0); counter!("bar", 3); counter!("bar", 4); - timing!("baz", 5); - timing!("baz", 6); - value!("quux", 7, "host" => "foo"); - value!("quux", 8, "host" => "foo"); + histogram!("baz", 5); + histogram!("baz", 6); + histogram!("quux", 7, "host" => "foo"); + histogram!("quux", 8, "host" => "foo"); let controller = get_controller().expect("no controller"); From 67d471b4cd80928a24a67fbda2add25f009327d1 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 15 Sep 2020 20:19:21 +0300 Subject: [PATCH 02/13] Enable capturing tracing spans as metric labels Signed-off-by: MOZGIII --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 1 + src/metrics.rs | 50 ++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7b53f065a27f..0a9c07d1f7582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2415,6 +2415,18 @@ dependencies = [ "syn 1.0.39", ] +[[package]] +name = "metrics-tracing-context" +version = "0.1.0-alpha1" +source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" +dependencies = [ + "metrics", + "metrics-util", + "tracing 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing-core 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing-subscriber", +] + [[package]] name = "metrics-util" version = "0.4.0-alpha.2" @@ -5380,6 +5392,7 @@ dependencies = [ "matches", "maxminddb", "metrics", + "metrics-tracing-context", "metrics-util", "nix 0.16.1", "nom 5.1.2", diff --git a/Cargo.toml b/Cargo.toml index d0ecc025f2516..efbc36b0c27a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ tracing-tower = { git = "https://github.com/tokio-rs/tracing", rev = "f470db1b03 # Metrics metrics = { version = "0.13.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } metrics-util = { version = "0.4.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } +metrics-tracing-context = { version = "0.1.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } # Aws rusoto_core = { version = "0.45.0", features = ["encoding"], optional = true } diff --git a/src/metrics.rs b/src/metrics.rs index d73d0bad2fb61..f1b7737fe4d19 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,33 +1,47 @@ use crate::{event::Metric, Event}; use metrics::{Key, Recorder}; +use metrics_tracing_context::TracingContextLayer; +use metrics_util::layers::Layer; use metrics_util::{CompositeKey, Handle, MetricKind, Registry}; use once_cell::sync::OnceCell; +use std::sync::Arc; static CONTROLLER: OnceCell = OnceCell::new(); pub fn init() -> crate::Result<()> { + // Prepare the registry. + let registry = Registry::new(); + let registry = Arc::new(registry); + + // Initialize the controller. + let controller = Controller { + registry: Arc::clone(®istry), + }; + // Register the controller globally. CONTROLLER - .set(Controller::new()) + .set(controller) .map_err(|_| "controller already initialized")?; - metrics::set_recorder(CONTROLLER.get().unwrap()).map_err(|_| "recorder already initialized")?; + // Initialize the recorder. + let recorder = VectorRecorder { + registry: Arc::clone(®istry), + }; + // Apply a layer to capture tracing span fields as labels. + let recorder = TracingContextLayer::all().layer(recorder); + // Register the recorder globally. + metrics::set_boxed_recorder(Box::new(recorder)).map_err(|_| "recorder already initialized")?; + // Done. Ok(()) } -pub struct Controller { - registry: Registry, -} - -impl Controller { - pub fn new() -> Self { - Self { - registry: Registry::new(), - } - } +/// [`VectorRecorder`] is a [`metrics::Recorder`] implementation that's suitable +/// for the advanced usage that we have in Vector. +struct VectorRecorder { + registry: Arc>, } -impl Recorder for Controller { +impl Recorder for VectorRecorder { fn register_counter(&self, key: Key, _description: Option<&'static str>) { let ckey = CompositeKey::new(MetricKind::Counter, key); self.registry.op(ckey, |_| {}, || Handle::counter()) @@ -67,13 +81,19 @@ impl Recorder for Controller { } } +/// Controller allows capturing metric snapshots. +pub struct Controller { + registry: Arc>, +} + +/// Get a handle to the globally registered controller, if it's initialized. pub fn get_controller() -> crate::Result<&'static Controller> { CONTROLLER .get() .ok_or_else(|| "metrics system not initialized".into()) } -pub fn snapshot(controller: &Controller) -> Vec { +fn snapshot(controller: &Controller) -> Vec { let handles = controller.registry.get_handles(); handles .into_iter() @@ -84,6 +104,8 @@ pub fn snapshot(controller: &Controller) -> Vec { .collect() } +/// Take a snapshot of all gathered metrics and expose them as metric +/// [`Event`]s. pub fn capture_metrics(controller: &Controller) -> impl Iterator { snapshot(controller).into_iter() } From 59c9cc13d22f5674ef15c333ea2ba2fb96b0aaf9 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 15 Sep 2020 20:22:22 +0300 Subject: [PATCH 03/13] Implement label filtering Signed-off-by: MOZGIII --- src/metrics.rs | 13 +++++++++++-- src/trace.rs | 7 +++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index f1b7737fe4d19..19808353f5532 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,6 +1,6 @@ use crate::{event::Metric, Event}; -use metrics::{Key, Recorder}; -use metrics_tracing_context::TracingContextLayer; +use metrics::{Key, Label, Recorder}; +use metrics_tracing_context::{LabelFilter, TracingContextLayer}; use metrics_util::layers::Layer; use metrics_util::{CompositeKey, Handle, MetricKind, Registry}; use once_cell::sync::OnceCell; @@ -81,6 +81,15 @@ impl Recorder for VectorRecorder { } } +#[derive(Debug, Clone)] +struct ComponentNameFilter; + +impl LabelFilter for ComponentNameFilter { + fn should_include_label(&self, label: &Label) -> bool { + label.key() == "component_name" + } +} + /// Controller allows capturing metric snapshots. pub struct Controller { registry: Arc>, diff --git a/src/trace.rs b/src/trace.rs index f0c0e07d81ed0..5927a6eebf1e2 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -1,3 +1,4 @@ +use metrics_tracing_context::MetricsLayer; use tracing::{ dispatcher::{set_global_default, Dispatch}, span::Span, @@ -16,7 +17,8 @@ pub fn init(color: bool, json: bool, levels: &str) { .json() .flatten_event(true) .finish() - .with(Limit::default()); + .with(Limit::default()) + .with(MetricsLayer::new()); Dispatch::new(subscriber) } else { @@ -24,7 +26,8 @@ pub fn init(color: bool, json: bool, levels: &str) { .with_ansi(color) .with_env_filter(levels) .finish() - .with(Limit::default()); + .with(Limit::default()) + .with(MetricsLayer::new()); Dispatch::new(subscriber) }; From 95d857d8db9325f1af19f8901bb377dbc5206691 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 15 Sep 2020 20:51:00 +0300 Subject: [PATCH 04/13] Add a test for label injection Signed-off-by: MOZGIII --- src/metrics.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/metrics.rs b/src/metrics.rs index 19808353f5532..44e22f3a6e347 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -118,3 +118,34 @@ fn snapshot(controller: &Controller) -> Vec { pub fn capture_metrics(controller: &Controller) -> impl Iterator { snapshot(controller).into_iter() } + +#[cfg(test)] +mod tests { + use crate::test_util::trace_init; + use metrics::counter; + use tracing::{span, Level}; + + #[test] + fn test_labels_injection() { + trace_init(); + let _ = super::init(); + + let span = span!(Level::INFO, "my span", component_name = "foobar"); + let _enter = span.enter(); + + counter!("labels_injected", 1); + + let metric = super::capture_metrics(super::get_controller().unwrap()) + .map(|e| e.into_metric()) + .find(|metric| metric.name == "labels_injected") + .unwrap(); + + let expected_tags = Some( + vec![("component_name".to_owned(), "foobar".to_owned())] + .into_iter() + .collect(), + ); + + assert_eq!(metric.tags, expected_tags); + } +} From 2fa73cd55ff0fe0666c53a2285bc135e16cee95f Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 15 Sep 2020 21:25:49 +0300 Subject: [PATCH 05/13] Add a workaround for disabled spans in tests Signed-off-by: MOZGIII --- src/metrics.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/metrics.rs b/src/metrics.rs index 44e22f3a6e347..873300a8fb3cf 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -125,12 +125,17 @@ mod tests { use metrics::counter; use tracing::{span, Level}; + #[ignore] #[test] fn test_labels_injection() { trace_init(); let _ = super::init(); - let span = span!(Level::INFO, "my span", component_name = "foobar"); + let span = span!(Level::ERROR, "my span", component_name = "foobar"); + // See https://github.com/tokio-rs/tracing/issues/978 + if span.is_disabled() { + panic!("test is not configured properly, set TEST_LOG=info env var") + } let _enter = span.enter(); counter!("labels_injected", 1); From c909e62ebd51d35dc1b05aacb96b23793571f34f Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 17 Sep 2020 13:22:47 +0300 Subject: [PATCH 06/13] Switch to published crates Signed-off-by: MOZGIII --- Cargo.lock | 20 ++++++++++++-------- Cargo.toml | 6 +++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a9c07d1f7582..7a4403b6cd6c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,8 +2395,9 @@ dependencies = [ [[package]] name = "metrics" -version = "0.13.0-alpha.3" -source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" +version = "0.13.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de7e68e5386ca1883535512584aa1f41bd8129fc38f6ce6b252eb4325f0ed5f6" dependencies = [ "metrics-macros", "once_cell", @@ -2405,8 +2406,9 @@ dependencies = [ [[package]] name = "metrics-macros" -version = "0.1.0-alpha.2" -source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" +version = "0.1.0-alpha.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0837df974417cbf0c476c54883e5f42fdaa5fa04cabe6dd6f30f58103c4b9bfc" dependencies = [ "proc-macro-hack", "proc-macro2 1.0.19", @@ -2417,8 +2419,9 @@ dependencies = [ [[package]] name = "metrics-tracing-context" -version = "0.1.0-alpha1" -source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" +version = "0.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c67257e8274a6bbd93597f5b8e3bd11d4c1e46fe516f459c0974246845fadb15" dependencies = [ "metrics", "metrics-util", @@ -2429,8 +2432,9 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.4.0-alpha.2" -source = "git+https://github.com/MOZGIII/metrics?branch=refactor/metrics-v2-tracing-context#339dc25b45968424d566dd798588df042e03359a" +version = "0.4.0-alpha.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0640e284e8696d7880ccc330c463bbe5af72be8c50e137998297b7989022484" dependencies = [ "aho-corasick", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index efbc36b0c27a3..42233646b495a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,9 +66,9 @@ tracing-log = "0.1.0" tracing-tower = { git = "https://github.com/tokio-rs/tracing", rev = "f470db1b0354b368f62f9ee4d763595d16373231" } # Metrics -metrics = { version = "0.13.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } -metrics-util = { version = "0.4.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } -metrics-tracing-context = { version = "0.1.0-alpha", git = "https://github.com/MOZGIII/metrics", branch = "refactor/metrics-v2-tracing-context" } +metrics = { version = "0.13.0-alpha" } +metrics-util = { version = "0.4.0-alpha" } +metrics-tracing-context = { version = "0.1.0-alpha" } # Aws rusoto_core = { version = "0.45.0", features = ["encoding"], optional = true } From 414f67b0442c5906bf72aa3649ae4c52bc2cf4d7 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 17 Sep 2020 13:28:15 +0300 Subject: [PATCH 07/13] Fix clippy offences Signed-off-by: MOZGIII --- src/metrics.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 873300a8fb3cf..e7bb6d2f4ae0d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -44,15 +44,15 @@ struct VectorRecorder { impl Recorder for VectorRecorder { fn register_counter(&self, key: Key, _description: Option<&'static str>) { let ckey = CompositeKey::new(MetricKind::Counter, key); - self.registry.op(ckey, |_| {}, || Handle::counter()) + self.registry.op(ckey, |_| {}, Handle::counter) } fn register_gauge(&self, key: Key, _description: Option<&'static str>) { let ckey = CompositeKey::new(MetricKind::Gauge, key); - self.registry.op(ckey, |_| {}, || Handle::gauge()) + self.registry.op(ckey, |_| {}, Handle::gauge) } fn register_histogram(&self, key: Key, _description: Option<&'static str>) { let ckey = CompositeKey::new(MetricKind::Histogram, key); - self.registry.op(ckey, |_| {}, || Handle::histogram()) + self.registry.op(ckey, |_| {}, Handle::histogram) } fn increment_counter(&self, key: Key, value: u64) { @@ -60,7 +60,7 @@ impl Recorder for VectorRecorder { self.registry.op( ckey, |handle| handle.increment_counter(value), - || Handle::counter(), + Handle::counter, ) } fn update_gauge(&self, key: Key, value: f64) { @@ -68,7 +68,7 @@ impl Recorder for VectorRecorder { self.registry.op( ckey, |handle| handle.update_gauge(value), - || Handle::gauge(), + Handle::gauge, ) } fn record_histogram(&self, key: Key, value: u64) { @@ -76,7 +76,7 @@ impl Recorder for VectorRecorder { self.registry.op( ckey, |handle| handle.record_histogram(value), - || Handle::histogram(), + Handle::histogram, ) } } From 78cd17593368af08c5d9468a44bab2dac028fd8a Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 25 Sep 2020 16:57:22 +0300 Subject: [PATCH 08/13] Fix the autoformatting Signed-off-by: MOZGIII --- src/metrics.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index e7bb6d2f4ae0d..10241828c01f2 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -65,11 +65,8 @@ impl Recorder for VectorRecorder { } fn update_gauge(&self, key: Key, value: f64) { let ckey = CompositeKey::new(MetricKind::Gauge, key); - self.registry.op( - ckey, - |handle| handle.update_gauge(value), - Handle::gauge, - ) + self.registry + .op(ckey, |handle| handle.update_gauge(value), Handle::gauge) } fn record_histogram(&self, key: Key, value: u64) { let ckey = CompositeKey::new(MetricKind::Histogram, key); From bf47b699c6c15b2a377855ef58808420c20403b2 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 25 Sep 2020 16:58:08 +0300 Subject: [PATCH 09/13] Rename the ComponentNameFilter to VectorLabelFilter Signed-off-by: MOZGIII --- src/metrics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 10241828c01f2..44ad6435e3dd1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -79,9 +79,9 @@ impl Recorder for VectorRecorder { } #[derive(Debug, Clone)] -struct ComponentNameFilter; +struct VectorLabelFilter; -impl LabelFilter for ComponentNameFilter { +impl LabelFilter for VectorLabelFilter { fn should_include_label(&self, label: &Label) -> bool { label.key() == "component_name" } From b1d1c61ac8cad2a841d93416922e831a4d9340e0 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 25 Sep 2020 16:58:48 +0300 Subject: [PATCH 10/13] Actually use the VectorLabelFilter for TracingContextLayer initialization Signed-off-by: MOZGIII --- src/metrics.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 44ad6435e3dd1..2421f72899ccd 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -27,7 +27,7 @@ pub fn init() -> crate::Result<()> { registry: Arc::clone(®istry), }; // Apply a layer to capture tracing span fields as labels. - let recorder = TracingContextLayer::all().layer(recorder); + let recorder = TracingContextLayer::new(VectorLabelFilter).layer(recorder); // Register the recorder globally. metrics::set_boxed_recorder(Box::new(recorder)).map_err(|_| "recorder already initialized")?; @@ -128,7 +128,12 @@ mod tests { trace_init(); let _ = super::init(); - let span = span!(Level::ERROR, "my span", component_name = "foobar"); + let span = span!( + Level::ERROR, + "my span", + component_name = "foo", + some_other_label = "bar" + ); // See https://github.com/tokio-rs/tracing/issues/978 if span.is_disabled() { panic!("test is not configured properly, set TEST_LOG=info env var") @@ -143,7 +148,7 @@ mod tests { .unwrap(); let expected_tags = Some( - vec![("component_name".to_owned(), "foobar".to_owned())] + vec![("component_name".to_owned(), "foo".to_owned())] .into_iter() .collect(), ); From 99b32c6053fddb9aedb87b5b9f18f5f05d2047a8 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 25 Sep 2020 17:03:32 +0300 Subject: [PATCH 11/13] Adjust the actual spans and included label names Signed-off-by: MOZGIII --- src/metrics.rs | 23 +++++++++++++++++------ src/topology/mod.rs | 18 +++++++++++++++--- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 2421f72899ccd..32b3c52b5107e 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -83,7 +83,8 @@ struct VectorLabelFilter; impl LabelFilter for VectorLabelFilter { fn should_include_label(&self, label: &Label) -> bool { - label.key() == "component_name" + let key = label.key(); + key == "topology_component_name" || key == "topology_component_type" } } @@ -131,8 +132,9 @@ mod tests { let span = span!( Level::ERROR, "my span", - component_name = "foo", - some_other_label = "bar" + topology_component_name = "my_component_name", + topology_component_type = "my_component_type", + some_other_label = "qwerty" ); // See https://github.com/tokio-rs/tracing/issues/978 if span.is_disabled() { @@ -148,9 +150,18 @@ mod tests { .unwrap(); let expected_tags = Some( - vec![("component_name".to_owned(), "foo".to_owned())] - .into_iter() - .collect(), + vec![ + ( + "topology_component_name".to_owned(), + "my_component_name".to_owned(), + ), + ( + "topology_component_type".to_owned(), + "my_component_type".to_owned(), + ), + ] + .into_iter() + .collect(), ); assert_eq!(metric.tags, expected_tags); diff --git a/src/topology/mod.rs b/src/topology/mod.rs index 516f812f7bf85..ab1d7d09959c3 100644 --- a/src/topology/mod.rs +++ b/src/topology/mod.rs @@ -436,7 +436,11 @@ impl RunningTopology { fn spawn_sink(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!("sink", name = %task.name(), r#type = %task.typetag()); + let span = info_span!( + "sink", + topology_component_name = %task.name(), + topology_component_type = %task.typetag(), + ); let task = handle_errors(task.compat(), self.abort_tx.clone()).instrument(span); let spawned = tokio::spawn(task.compat()); if let Some(previous) = self.tasks.insert(name.to_string(), spawned) { @@ -446,7 +450,11 @@ impl RunningTopology { fn spawn_transform(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!("transform", name = %task.name(), r#type = %task.typetag()); + let span = info_span!( + "transform", + topology_component_name = %task.name(), + topology_component_type = %task.typetag(), + ); let task = handle_errors(task.compat(), self.abort_tx.clone()).instrument(span); let spawned = tokio::spawn(task.compat()); if let Some(previous) = self.tasks.insert(name.to_string(), spawned) { @@ -456,7 +464,11 @@ impl RunningTopology { fn spawn_source(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!("source", name = %task.name(), r#type = %task.typetag()); + let span = info_span!( + "source", + topology_component_name = %task.name(), + topology_component_type = %task.typetag(), + ); let task = handle_errors(task.compat(), self.abort_tx.clone()).instrument(span.clone()); let spawned = tokio::spawn(task.compat()); if let Some(previous) = self.tasks.insert(name.to_string(), spawned) { From e0a18935b1ce5f076cd7a3d082fca8e1d33232c5 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 25 Sep 2020 17:04:39 +0300 Subject: [PATCH 12/13] Use the highest span level to avoid being affected by the log levels Signed-off-by: MOZGIII --- src/topology/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/topology/mod.rs b/src/topology/mod.rs index ab1d7d09959c3..89f13e105015c 100644 --- a/src/topology/mod.rs +++ b/src/topology/mod.rs @@ -436,7 +436,7 @@ impl RunningTopology { fn spawn_sink(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!( + let span = error_span!( "sink", topology_component_name = %task.name(), topology_component_type = %task.typetag(), @@ -450,7 +450,7 @@ impl RunningTopology { fn spawn_transform(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!( + let span = error_span!( "transform", topology_component_name = %task.name(), topology_component_type = %task.typetag(), @@ -464,7 +464,7 @@ impl RunningTopology { fn spawn_source(&mut self, name: &str, new_pieces: &mut builder::Pieces) { let task = new_pieces.tasks.remove(name).unwrap(); - let span = info_span!( + let span = error_span!( "source", topology_component_name = %task.name(), topology_component_type = %task.typetag(), From 2676cd12ddf59243d2e1284fe4826083a64933f2 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 25 Sep 2020 21:19:44 +0300 Subject: [PATCH 13/13] Correct the metrics initialization at k8s instrumenting state tests Signed-off-by: MOZGIII --- src/kubernetes/state/instrumenting.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/kubernetes/state/instrumenting.rs b/src/kubernetes/state/instrumenting.rs index 4b975b7cf1ade..e5af68e47e8d4 100644 --- a/src/kubernetes/state/instrumenting.rs +++ b/src/kubernetes/state/instrumenting.rs @@ -97,10 +97,7 @@ mod tests { } fn get_metric_value(op_kind: &'static str) -> Option { - let controller = crate::metrics::get_controller().unwrap_or_else(|_| { - crate::metrics::init().expect("unable to init metrics"); - crate::metrics::get_controller().expect("failed to init metric container") - }); + let controller = crate::metrics::get_controller().expect("failed to init metric container"); let tags_to_lookup = Some( vec![("op_kind".to_owned(), op_kind.to_owned())] @@ -150,7 +147,7 @@ mod tests { #[tokio::test] async fn add() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -181,7 +178,7 @@ mod tests { #[tokio::test] async fn update() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -212,7 +209,7 @@ mod tests { #[tokio::test] async fn delete() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -243,7 +240,7 @@ mod tests { #[tokio::test] async fn resync() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); @@ -270,7 +267,7 @@ mod tests { #[tokio::test] async fn request_maintenance_without_maintenance() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, _events_rx, _actions_tx) = prepare_test(); @@ -283,7 +280,7 @@ mod tests { #[tokio::test] async fn request_maintenance_with_maintenance() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (events_tx, _events_rx) = mpsc::channel(0); @@ -306,7 +303,7 @@ mod tests { #[tokio::test] async fn perform_maintenance() { trace_init(); - + let _ = crate::metrics::init(); let _guard = tests_lock(); let (mut writer, mut events_rx, mut actions_tx) = prepare_test();