diff --git a/hyperactor_telemetry/Cargo.toml b/hyperactor_telemetry/Cargo.toml index d01e23b0f..82cac324f 100644 --- a/hyperactor_telemetry/Cargo.toml +++ b/hyperactor_telemetry/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry +# @generated by autocargo from //monarch/hyperactor_telemetry:[correctness_test,hyperactor_telemetry,telemetry_benchmark] [package] name = "hyperactor_telemetry" @@ -10,13 +10,23 @@ license = "BSD-3-Clause" [lib] edition = "2024" +[[bench]] +name = "correctness_test" +edition = "2024" + +[[bench]] +name = "telemetry_benchmark" +edition = "2024" + [dependencies] anyhow = "1.0.98" chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false } dashmap = { version = "5.5.3", features = ["rayon", "serde"] } fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } hdrhistogram = "7.5" +indexmap = { version = "2.9.0", features = ["arbitrary", "rayon", "serde"] } lazy_static = "1.5" +libc = "0.2.139" opentelemetry = "0.29" opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] } rand = { version = "0.8", features = ["small_rng"] } diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs new file mode 100644 index 000000000..a060b82c2 --- /dev/null +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -0,0 +1,238 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Correctness test harness comparing old vs unified telemetry implementations. +//! +//! This test harness runs identical workloads through both implementations and +//! verifies that the outputs are equivalent across all exporters +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:correctness_test + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock` + +use anyhow::Result; +use hyperactor_telemetry::*; + +struct TestResults {} + +struct CorrectnessTestHarness {} + +impl CorrectnessTestHarness { + fn run(&self, workload: F) -> Result + where + F: Fn(), + { + initialize_logging_with_log_prefix( + DefaultTelemetryClock {}, + Some("TEST_LOG_PREFIX".to_string()), + ); + + workload(); + + std::thread::sleep(std::time::Duration::from_millis(300)); + + Ok(TestResults {}) + } +} + +// ============================================================================ +// Test Workloads +// ============================================================================ + +fn workload_simple_info_events() { + for i in 0..100 { + tracing::info!(iteration = i, "simple info event"); + } +} + +fn workload_spans_with_fields() { + for i in 0..50 { + let _span = tracing::info_span!( + "test_span", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn workload_nested_spans() { + for i in 0..20 { + let _outer = tracing::info_span!("outer", iteration = i).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + tracing::info!("inside nested span"); + } + } + } +} + +fn workload_events_with_fields() { + for i in 0..100 { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with many fields" + ); + } +} + +fn workload_mixed_log_levels() { + for _ in 0..25 { + tracing::trace!("trace event"); + tracing::debug!(value = 1, "debug event"); + tracing::info!(value = 2, "info event"); + tracing::warn!(value = 3, "warn event"); + tracing::error!(value = 4, "error event"); + } +} + +fn workload_events_in_spans() { + for i in 0..30 { + let _span = tracing::info_span!("outer_span", iteration = i).entered(); + tracing::info!(step = "start", "starting work"); + tracing::debug!(step = "middle", "doing work"); + tracing::info!(step = "end", "finished work"); + } +} + +fn main() -> Result<()> { + let args: Vec = std::env::args().collect(); + + // This script will spawn itself into this branch + if args.len() > 2 { + let test_name = &args[1]; + let impl_type = &args[2]; + return run_single_test(test_name, impl_type); + } + + println!("\n\nHyperactor Telemetry Correctness Test Suite"); + println!("Comparing OLD vs UNIFIED implementations\n"); + + let tests = vec![ + "simple_info_events", + "spans_with_fields", + "nested_spans", + "events_with_fields", + "mixed_log_levels", + "events_in_spans", + ]; + + let mut all_passed = true; + + for test_name in tests { + println!("\n{}", "=".repeat(80)); + println!("Running test: {}", test_name_to_display(test_name)); + println!("{}", "=".repeat(80)); + + let mut test_passed = true; + + println!("\n[Running OLD implementation...]"); + let old_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--old") + .env("TEST_LOG_PREFIX", "test") + .status()?; + + if !old_status.success() { + println!("\n✗ OLD implementation FAILED"); + all_passed = false; + test_passed = false; + } + + println!("\n[Running UNIFIED implementation...]"); + let unified_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--unified") + .env("TEST_LOG_PREFIX", "test") + .status()?; + + if !unified_status.success() { + println!("\n✗ UNIFIED implementation FAILED"); + all_passed = false; + test_passed = false; + } + + if test_passed { + println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); + } else { + println!("\n✗ Test FAILED: {}", test_name_to_display(test_name)); + } + } + + println!("\n\n{}", "=".repeat(80)); + if all_passed { + println!("All tests completed successfully!"); + } else { + println!("Some tests FAILED!"); + return Err(anyhow::anyhow!("Test failures detected")); + } + println!("{}", "=".repeat(80)); + + Ok(()) +} + +/// Called in child process +fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { + let harness = CorrectnessTestHarness {}; + + let workload: fn() = match test_name { + "simple_info_events" => workload_simple_info_events, + "spans_with_fields" => workload_spans_with_fields, + "nested_spans" => workload_nested_spans, + "events_with_fields" => workload_events_with_fields, + "mixed_log_levels" => workload_mixed_log_levels, + "events_in_spans" => workload_events_in_spans, + _ => { + return Err(anyhow::anyhow!("Unknown test: {}", test_name)); + } + }; + + let _results = match impl_type { + "--old" => { + println!("Running with OLD implementation..."); + harness.run(workload)? + } + "--unified" => { + println!("Running with UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + harness.run(workload)? + } + _ => { + return Err(anyhow::anyhow!( + "Unknown implementation type: {}", + impl_type + )); + } + }; + + Ok(()) +} + +fn test_name_to_display(test_name: &str) -> &str { + match test_name { + "simple_info_events" => "Simple info events", + "spans_with_fields" => "Spans with fields", + "nested_spans" => "Nested spans", + "events_with_fields" => "Events with many fields", + "mixed_log_levels" => "Mixed log levels", + "events_in_spans" => "Events in spans", + _ => test_name, + } +} diff --git a/hyperactor_telemetry/benches/telemetry_benchmark.rs b/hyperactor_telemetry/benches/telemetry_benchmark.rs new file mode 100644 index 000000000..b6653f7a4 --- /dev/null +++ b/hyperactor_telemetry/benches/telemetry_benchmark.rs @@ -0,0 +1,204 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Benchmark comparing old vs unified telemetry implementations. +//! +//! This benchmark simulates a realistic workload with: +//! - Nested spans (simulating actor message processing) +//! - Events at different log levels +//! - Field recording +//! - Multiple iterations +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --unified +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --compare + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor` just for `hyperactor::clock::Clock` + +use std::time::Instant; + +use hyperactor_telemetry::*; + +fn stage_debug_events_only(iterations: usize) { + for i in 0..iterations { + tracing::debug!(iteration = i, "debug event"); + } +} + +fn stage_info_events_only(iterations: usize) { + for i in 0..iterations { + tracing::info!(iteration = i, "info event"); + } +} + +fn stage_trace_events_only(iterations: usize) { + for i in 0..iterations { + tracing::trace!(iteration = i, "trace event"); + } +} + +fn stage_error_events_only(iterations: usize) { + for i in 0..iterations { + tracing::error!(iteration = i, "error event"); + } +} + +fn stage_simple_spans_only(iterations: usize) { + for _ in 0..iterations { + let _span = tracing::info_span!("simple_span").entered(); + } +} + +fn stage_spans_with_fields(iterations: usize) { + for i in 0..iterations { + let _span = tracing::info_span!( + "span_with_fields", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn stage_nested_spans(iterations: usize) { + for _ in 0..iterations { + let _outer = tracing::info_span!("outer", level = 1).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + } + } + } +} + +fn stage_events_with_fields(iterations: usize) { + for i in 0..iterations { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with fields" + ); + } +} + +fn run_benchmark_stages(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + let stages: Vec<(&'static str, fn(usize))> = vec![ + ("Debug events only", stage_debug_events_only), + ("Info events only", stage_info_events_only), + ("Trace events only", stage_trace_events_only), + ("Error events only", stage_error_events_only), + ("Simple spans only", stage_simple_spans_only), + ("Spans with fields", stage_spans_with_fields), + ("Nested spans (3 levels)", stage_nested_spans), + ("Events with fields", stage_events_with_fields), + ]; + + let mut results = Vec::new(); + + for (name, stage_fn) in stages { + // Warm up + stage_fn(10); + + // Benchmark + let start = Instant::now(); + stage_fn(iterations); + let elapsed = start.elapsed(); + + println!( + " {:30} {} iterations in {:>12?} ({:>10?}/iter)", + format!("{}:", name), + iterations, + elapsed, + elapsed / iterations as u32 + ); + + results.push((name, elapsed)); + } + + results +} + +fn benchmark(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + println!("{}", "=".repeat(100)); + + initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + + let results = run_benchmark_stages(iterations); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + results +} + +fn main() { + let args: Vec = std::env::args().collect(); + + let iterations = 1000; + + if args.len() < 2 { + println!("Usage: {} [--old | --unified | --compare]", args[0]); + println!(" --old: Benchmark old implementation only"); + println!(" --unified: Benchmark unified implementation only"); + println!(" --compare: Run both in separate processes and compare"); + return; + } + + match args[1].as_str() { + "--old" => { + println!("Benchmarking OLD implementation..."); + // Don't set USE_UNIFIED_LAYER - uses old implementation + let _results = benchmark(iterations); + println!("\n{}", "=".repeat(100)); + } + "--unified" => { + println!("Benchmarking UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + let _results = benchmark(iterations); + println!("\n{}", "=".repeat(100)); + } + "--compare" => { + println!( + "Running comparison benchmark with {} iterations...\n", + iterations + ); + + let old_status = std::process::Command::new(&args[0]) + .arg("--old") + .status() + .expect("Failed to spawn old implementation"); + + if !old_status.success() { + eprintln!("\n✗ OLD implementation benchmark FAILED"); + return; + } + + let unified_status = std::process::Command::new(&args[0]) + .arg("--unified") + .status() + .expect("Failed to spawn unified implementation"); + + if !unified_status.success() { + eprintln!("\n✗ UNIFIED implementation benchmark FAILED"); + } + } + _ => { + println!("Unknown option: {}", args[1]); + println!("Use --old, --unified, or --compare"); + } + } +} diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 0210e0a5e..c63c46d3b 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -7,6 +7,7 @@ */ #![allow(internal_features)] +#![allow(clippy::disallowed_methods)] // hyperactor_telemetry can't use hyperactor::clock::Clock (circular dependency) #![feature(assert_matches)] #![feature(sync_unsafe_cell)] #![feature(mpmc_channel)] @@ -36,6 +37,10 @@ const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG"; pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME"; +/// Environment variable to enable the unified layer. +/// Set to "1" to enable. +pub const USE_UNIFIED_LAYER: &str = "USE_UNIFIED_LAYER"; + // Log level constants const LOG_LEVEL_INFO: &str = "info"; const LOG_LEVEL_DEBUG: &str = "debug"; @@ -63,6 +68,7 @@ mod spool; pub mod sqlite; pub mod task; pub mod trace; +pub mod trace_dispatcher; use std::io::Write; use std::str::FromStr; use std::sync::Arc; @@ -549,6 +555,13 @@ pub fn initialize_logging_for_test() { initialize_logging(DefaultTelemetryClock {}); } +fn is_layer_enabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} +fn is_layer_disabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} + /// Set up logging based on the given execution environment. We specialize logging based on how the /// logs are consumed. The destination scuba table is specialized based on the execution environment. /// mast -> monarch_tracing/prod @@ -567,6 +580,8 @@ pub fn initialize_logging_with_log_prefix( clock: impl TelemetryClock + Send + 'static, prefix_env_var: Option, ) { + let use_unified = std::env::var(USE_UNIFIED_LAYER).unwrap_or_default() == "1"; + swap_telemetry_clock(clock); let file_log_level = match env::Env::current() { env::Env::Local => LOG_LEVEL_INFO, @@ -603,13 +618,32 @@ pub fn initialize_logging_with_log_prefix( #[cfg(fbcode_build)] { use crate::env::Env; - fn is_layer_enabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - fn is_layer_disabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - if let Err(err) = Registry::default() + if use_unified { + if let Err(err) = Registry::default() + .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { + // TODO: get_reloadable_sqlite_layer currently still returns None, + // and some additional work is required to make it work. + Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(file_layer) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .with(trace_dispatcher::TraceEventDispatcher::new(vec![], None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else if let Err(err) = Registry::default() .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { // TODO: get_reloadable_sqlite_layer currently still returns None, // and some additional work is required to make it work. @@ -633,20 +667,18 @@ pub fn initialize_logging_with_log_prefix( tracing::debug!("logging already initialized for this process: {}", err); } let exec_id = env::execution_id(); - tracing::info!( - target: "execution", - execution_id = exec_id, - environment = %Env::current(), - args = ?std::env::args(), - build_mode = build_info::BuildInfo::get_build_mode(), - compiler = build_info::BuildInfo::get_compiler(), - compiler_version = build_info::BuildInfo::get_compiler_version(), - buck_rule = build_info::BuildInfo::get_rule(), - package_name = build_info::BuildInfo::get_package_name(), - package_release = build_info::BuildInfo::get_package_release(), - upstream_revision = build_info::BuildInfo::get_upstream_revision(), - revision = build_info::BuildInfo::get_revision(), - "logging_initialized" + meta::log_execution_event( + &exec_id, + &Env::current().to_string(), + std::env::args().collect(), + build_info::BuildInfo::get_build_mode(), + build_info::BuildInfo::get_compiler(), + build_info::BuildInfo::get_compiler_version(), + build_info::BuildInfo::get_rule(), + build_info::BuildInfo::get_package_name(), + build_info::BuildInfo::get_package_release(), + build_info::BuildInfo::get_upstream_revision(), + build_info::BuildInfo::get_revision(), ); if !is_layer_disabled(DISABLE_OTEL_METRICS) { @@ -655,18 +687,25 @@ pub fn initialize_logging_with_log_prefix( } #[cfg(not(fbcode_build))] { - if let Err(err) = Registry::default() - .with(file_layer) - .with( - if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" { - Some(recorder().layer()) - } else { - None - }, - ) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); + let registry = Registry::default().with(file_layer).with( + if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }, + ); + + if use_unified { + if let Err(err) = registry + .with(trace_dispatcher::TraceEventDispatcher::new(vec![], None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + if let Err(err) = registry.try_init() { + tracing::debug!("logging already initialized for this process: {}", err); + } } } } diff --git a/hyperactor_telemetry/src/trace_dispatcher.rs b/hyperactor_telemetry/src/trace_dispatcher.rs new file mode 100644 index 000000000..5d4726a7c --- /dev/null +++ b/hyperactor_telemetry/src/trace_dispatcher.rs @@ -0,0 +1,439 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Unified telemetry layer that captures trace events once and fans out to multiple exporters +//! on a background thread, eliminating redundant capture and moving work off the application +//! thread. + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::mpsc; +use std::thread::JoinHandle; +use std::time::Duration; +use std::time::SystemTime; + +use indexmap::IndexMap; +use tracing::Id; +use tracing::Subscriber; +use tracing_subscriber::filter::Targets; +use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::Layer; +use tracing_subscriber::registry::LookupSpan; + +const QUEUE_CAPACITY: usize = 100_000; + +/// Unified representation of a trace event captured from the tracing layer. +/// This is captured once on the application thread, then sent to the background +/// worker for fan-out to multiple exporters. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum TraceEvent { + /// A new span was created (on_new_span) + NewSpan { + id: u64, + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_id: Option, + thread_name: String, + file: Option<&'static str>, + line: Option, + }, + /// A span was entered (on_enter) + SpanEnter { id: u64, timestamp: SystemTime }, + /// A span was exited (on_exit) + SpanExit { id: u64, timestamp: SystemTime }, + /// A span was closed (dropped) + SpanClose { id: u64, timestamp: SystemTime }, + /// A tracing event occurred (e.g., tracing::info!()) + Event { + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_span: Option, + thread_id: String, + thread_name: String, + module_path: Option<&'static str>, + file: Option<&'static str>, + line: Option, + }, +} + +/// Simplified field value representation for trace events +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum FieldValue { + Bool(bool), + I64(i64), + U64(u64), + F64(f64), + Str(String), + Debug(String), +} + +/// Trait for sinks that receive trace events from the dispatcher. +/// Implementations run on the background worker thread and can perform +/// expensive I/O operations without blocking the application. +pub(crate) trait TraceEventSink: Send + 'static { + /// Consume a single event. Called on background thread. + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>; + + /// Optional target/level filter for this sink. + /// + /// The worker loop automatically applies this filter before calling `consume()`, + /// so sinks don't need to check target/level in their consume implementation. + /// Only `NewSpan` and `Event` are filtered by target/level; other event types + /// are always passed through. + /// + /// # Returns + /// - `None` - No filtering, all events are consumed (default) + /// - `Some(Targets)` - Only consume events matching the target/level filter + /// + /// # Example + /// ```ignore + /// fn target_filter(&self) -> Option<&Targets> { + /// Some(Targets::new() + /// .with_target("opentelemetry", LevelFilter::OFF) + /// .with_default(LevelFilter::DEBUG)) + /// } + /// ``` + fn target_filter(&self) -> Option<&Targets> { + None + } + + /// Flush any buffered events to the backend. + /// Called periodically and on shutdown. + fn flush(&mut self) -> Result<(), anyhow::Error>; + + /// Optional: return name for debugging/logging + fn name(&self) -> &str { + std::any::type_name::() + } +} + +/// The trace event dispatcher that captures events once and dispatches to multiple sinks +/// on a background thread. +pub struct TraceEventDispatcher { + sender: Option>, + _worker_handle: WorkerHandle, + max_level: Option, + dropped_events: Arc, +} + +struct WorkerHandle { + join_handle: Option>, +} + +impl TraceEventDispatcher { + /// Create a new trace event dispatcher with the given sinks. + /// Uses a bounded channel (capacity 10,000) to ensure telemetry never blocks + /// the application. Events are dropped with a warning if the queue is full. + /// + /// # Arguments + /// * `sinks` - List of sinks to dispatch events to. + /// * `max_level` - Maximum level filter hint (None for no filtering) + pub(crate) fn new( + sinks: Vec>, + max_level: Option, + ) -> Self { + let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY); + let dropped_events = Arc::new(AtomicU64::new(0)); + let dropped_events_worker = Arc::clone(&dropped_events); + + let worker_handle = std::thread::Builder::new() + .name("telemetry-worker".into()) + .spawn(move || { + worker_loop(receiver, sinks, dropped_events_worker); + }) + .expect("failed to spawn telemetry worker thread"); + + Self { + sender: Some(sender), + _worker_handle: WorkerHandle { + join_handle: Some(worker_handle), + }, + max_level, + dropped_events, + } + } + + fn send_event(&self, event: TraceEvent) { + if let Some(sender) = &self.sender { + if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) { + let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1; + + if dropped.is_multiple_of(1000) { + eprintln!( + "[telemetry] WARNING: {} events dropped due to full queue (capacity: 10,000). \ + Telemetry worker may be falling behind.", + dropped + ); + } + } + } + } +} + +impl Drop for TraceEventDispatcher { + fn drop(&mut self) { + // Explicitly drop the sender to close the channel. + // The next field to be dropped is `worker_handle` which + // will run it's own drop impl to join the thread and flush + drop(self.sender.take()); + } +} + +impl Layer for TraceEventDispatcher +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let metadata = attrs.metadata(); + let mut fields = IndexMap::new(); + + let mut visitor = FieldVisitor(&mut fields); + attrs.record(&mut visitor); + + let parent_id = if let Some(parent) = attrs.parent() { + Some(parent.into_u64()) + } else { + ctx.current_span().id().map(|id| id.into_u64()) + }; + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let event = TraceEvent::NewSpan { + id: id.into_u64(), + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_id, + thread_name, + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(event); + } + + fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanEnter { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanExit { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let metadata = event.metadata(); + let mut fields = IndexMap::new(); + let mut visitor = FieldVisitor(&mut fields); + event.record(&mut visitor); + + let parent_span = ctx.event_span(event).map(|span| span.id().into_u64()); + + #[cfg(target_os = "linux")] + let thread_id_num = { + // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only + // syscall that returns the current thread's kernel thread ID (TID). + // The cast to u64 is safe because gettid() returns a positive pid_t. + unsafe { libc::syscall(libc::SYS_gettid) as u64 } + }; + #[cfg(not(target_os = "linux"))] + let thread_id_num = { + let tid = std::thread::current().id(); + // SAFETY: ThreadId is a newtype wrapper around a u64 counter. + // This transmute relies on the internal representation of ThreadId, + // which is stable in practice but not guaranteed by Rust's API. + // On non-Linux platforms this is a best-effort approximation. + // See: https://doc.rust-lang.org/std/thread/struct.ThreadId.html + unsafe { std::mem::transmute::(tid) } + }; + let thread_id_str = thread_id_num.to_string(); + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let trace_event = TraceEvent::Event { + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_span, + thread_id: thread_id_str, + thread_name, + module_path: metadata.module_path(), + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(trace_event); + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanClose { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn max_level_hint(&self) -> Option { + self.max_level + } +} + +struct FieldVisitor<'a>(&'a mut IndexMap); + +impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.0 + .insert(field.name().to_string(), FieldValue::Bool(value)); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.0 + .insert(field.name().to_string(), FieldValue::I64(value)); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.0 + .insert(field.name().to_string(), FieldValue::U64(value)); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.0 + .insert(field.name().to_string(), FieldValue::F64(value)); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0 + .insert(field.name().to_string(), FieldValue::Str(value.to_string())); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + FieldValue::Debug(format!("{:?}", value)), + ); + } +} + +/// Background worker loop that receives events and dispatches them to sinks. +/// Runs until the sender is dropped +fn worker_loop( + receiver: mpsc::Receiver, + mut sinks: Vec>, + dropped_events: Arc, +) { + const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + const FLUSH_EVENT_COUNT: usize = 1000; + let mut last_flush = std::time::Instant::now(); + let mut events_since_flush = 0; + + fn flush_sinks(sinks: &mut [Box]) { + for sink in sinks { + if let Err(e) = sink.flush() { + eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e); + } + } + } + + fn dispatch_to_sinks(sinks: &mut [Box], event: TraceEvent) { + for sink in sinks { + if match &event { + TraceEvent::NewSpan { target, level, .. } + | TraceEvent::Event { target, level, .. } => match sink.target_filter() { + Some(targets) => targets.would_enable(target, level), + None => true, + }, + _ => true, + } { + if let Err(e) = sink.consume(&event) { + eprintln!( + "[telemetry] sink {} failed to consume event: {}", + sink.name(), + e + ); + } + } + } + } + + loop { + match receiver.recv_timeout(FLUSH_INTERVAL) { + Ok(event) => { + dispatch_to_sinks(&mut sinks, event); + events_since_flush += 1; + + if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL + { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + } + Err(mpsc::RecvTimeoutError::Timeout) => { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + break; + } + } + } + + while let Ok(event) = receiver.try_recv() { + dispatch_to_sinks(&mut sinks, event); + } + + flush_sinks(&mut sinks); + + let total_dropped = dropped_events.load(Ordering::Relaxed); + if total_dropped > 0 { + eprintln!( + "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}", + total_dropped + ); + } +} + +impl Drop for WorkerHandle { + fn drop(&mut self) { + if let Some(handle) = self.join_handle.take() { + if let Err(e) = handle.join() { + eprintln!("[telemetry] worker thread panicked: {:?}", e); + } + } + } +}