diff --git a/hyperactor_telemetry/Cargo.toml b/hyperactor_telemetry/Cargo.toml index d01e23b0f..82cac324f 100644 --- a/hyperactor_telemetry/Cargo.toml +++ b/hyperactor_telemetry/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry +# @generated by autocargo from //monarch/hyperactor_telemetry:[correctness_test,hyperactor_telemetry,telemetry_benchmark] [package] name = "hyperactor_telemetry" @@ -10,13 +10,23 @@ license = "BSD-3-Clause" [lib] edition = "2024" +[[bench]] +name = "correctness_test" +edition = "2024" + +[[bench]] +name = "telemetry_benchmark" +edition = "2024" + [dependencies] anyhow = "1.0.98" chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false } dashmap = { version = "5.5.3", features = ["rayon", "serde"] } fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } hdrhistogram = "7.5" +indexmap = { version = "2.9.0", features = ["arbitrary", "rayon", "serde"] } lazy_static = "1.5" +libc = "0.2.139" opentelemetry = "0.29" opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] } rand = { version = "0.8", features = ["small_rng"] } diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs new file mode 100644 index 000000000..978a36aa2 --- /dev/null +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -0,0 +1,418 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Correctness test harness comparing old vs unified telemetry implementations. +//! +//! This test harness runs identical workloads through both implementations and +//! verifies that the outputs are equivalent across all exporters: +//! - Glog: Read log files and compare lines +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:correctness_test + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock` + +use std::path::PathBuf; + +use anyhow::Result; +use hyperactor_telemetry::*; + +struct TestResults { + glog_path: Option, +} + +struct CorrectnessTestHarness {} + +impl CorrectnessTestHarness { + fn run(&self, workload: F) -> Result + where + F: Fn(), + { + initialize_logging_with_log_prefix( + DefaultTelemetryClock {}, + Some("TEST_LOG_PREFIX".to_string()), + ); + + workload(); + + std::thread::sleep(std::time::Duration::from_millis(300)); + + Ok(TestResults { + glog_path: Self::find_glog_path(), + }) + } + + fn find_glog_path() -> Option { + let username = whoami::username(); + let suffix = std::env::var(hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV) + .map(|s| format!("_{}", s)) + .unwrap_or_default(); + let possible_paths = vec![ + format!("/tmp/{}/monarch_log{}.log", username, suffix), + format!("/tmp/monarch_log{}.log", suffix), + format!("/logs/dedicated_log_monarch{}.log", suffix), + ]; + + for path in possible_paths { + if std::path::Path::new(&path).exists() { + return Some(PathBuf::from(path)); + } + } + None + } + + /// Normalize a glog line by removing timestamp, thread ID, file:line, and prefix for comparison. + /// Both old and unified implementations should now use the same format: + /// "[prefix]Lmmdd HH:MM:SS.ffffff thread_id file:line] message, fields" + /// + /// Normalized to: "L] message, fields" (prefix removed) + fn normalize_glog_line(line: &str) -> String { + // Find the level character position + if let Some(level_pos) = line + .chars() + .position(|c| matches!(c, 'I' | 'D' | 'E' | 'W' | 'T')) + { + // Find the closing bracket that comes AFTER the level character (not the one in the prefix) + if let Some(close_bracket) = line[level_pos..].find(']') { + let actual_bracket_pos = level_pos + close_bracket; + let level = &line[level_pos..=level_pos]; // e.g., "I" + let rest = &line[actual_bracket_pos + 1..].trim_start(); // Everything after the real "]" + // Don't include prefix - just level + content + return format!("{}] {}", level, rest); + } + } + + line.to_string() + } + + fn compare_glog_files(&self, old_file: &PathBuf, unified_file: &PathBuf) -> Result<()> { + println!("\n[Comparing Glog Files]"); + println!(" Old: {}", old_file.display()); + println!(" Unified: {}", unified_file.display()); + + let old_content = std::fs::read_to_string(old_file)?; + let unified_content = std::fs::read_to_string(unified_file)?; + + println!(" Old lines: {}", old_content.lines().count()); + println!(" Unified lines: {}", unified_content.lines().count()); + + let old_lines: Vec = old_content.lines().map(Self::normalize_glog_line).collect(); + + let unified_lines: Vec = unified_content + .lines() + .map(Self::normalize_glog_line) + .collect(); + + if old_lines.len() != unified_lines.len() { + return Err(anyhow::anyhow!( + "Line count mismatch: old={} unified={}", + old_lines.len(), + unified_lines.len() + )); + } + + let skip_lines = 1; + + for (i, (old_line, unified_line)) in old_lines + .iter() + .zip(unified_lines.iter()) + .enumerate() + .skip(skip_lines) + { + if old_line != unified_line { + return Err(anyhow::anyhow!( + "Line #{} mismatch:\n old: {}\n unified: {}", + i, + old_line, + unified_line + )); + } + } + + println!( + " ✓ All {} lines match (skipped {} initialization lines)!", + old_lines.len() - skip_lines, + skip_lines + ); + Ok(()) + } +} + +// ============================================================================ +// Test Workloads +// ============================================================================ + +fn workload_simple_info_events() { + for i in 0..100 { + tracing::info!(iteration = i, "simple info event"); + } +} + +fn workload_spans_with_fields() { + for i in 0..50 { + let _span = tracing::info_span!( + "test_span", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn workload_nested_spans() { + for i in 0..20 { + let _outer = tracing::info_span!("outer", iteration = i).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + tracing::info!("inside nested span"); + } + } + } +} + +fn workload_events_with_fields() { + for i in 0..100 { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with many fields" + ); + } +} + +fn workload_mixed_log_levels() { + for _ in 0..25 { + tracing::trace!("trace event"); + tracing::debug!(value = 1, "debug event"); + tracing::info!(value = 2, "info event"); + tracing::warn!(value = 3, "warn event"); + tracing::error!(value = 4, "error event"); + } +} + +fn workload_events_in_spans() { + for i in 0..30 { + let _span = tracing::info_span!("outer_span", iteration = i).entered(); + tracing::info!(step = "start", "starting work"); + tracing::debug!(step = "middle", "doing work"); + tracing::info!(step = "end", "finished work"); + } +} + +fn main() -> Result<()> { + let args: Vec = std::env::args().collect(); + + // This script will spawn itself into this branch + if args.len() > 2 { + let test_name = &args[1]; + let impl_type = &args[2]; + return run_single_test(test_name, impl_type); + } + + println!("\n\nHyperactor Telemetry Correctness Test Suite"); + println!("Comparing OLD vs UNIFIED implementations\n"); + + let tests = vec![ + "simple_info_events", + "spans_with_fields", + "nested_spans", + "events_with_fields", + "mixed_log_levels", + "events_in_spans", + ]; + + let mut all_passed = true; + + for test_name in tests { + println!("\n{}", "=".repeat(80)); + println!("Running test: {}", test_name_to_display(test_name)); + println!("{}", "=".repeat(80)); + + let mut test_passed = true; + + println!("\n[Running OLD implementation...]"); + let old_log_suffix = format!("{}_old", test_name); + let old_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--old") + .env("TEST_LOG_PREFIX", "test") + .env( + hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, + &old_log_suffix, + ) + .status()?; + + if !old_status.success() { + println!("\n✗ OLD implementation FAILED"); + all_passed = false; + test_passed = false; + } + + println!("\n[Running UNIFIED implementation...]"); + let unified_log_suffix = format!("{}_unified", test_name); + let unified_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--unified") + .env("TEST_LOG_PREFIX", "test") + .env( + hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, + &unified_log_suffix, + ) + .status()?; + + if !unified_status.success() { + println!("\n✗ UNIFIED implementation FAILED"); + all_passed = false; + test_passed = false; + } + + let username = whoami::username(); + let harness = CorrectnessTestHarness {}; + + // Compare glog files + let old_log = PathBuf::from(format!("/tmp/{}/test_{}_old.log", username, test_name)); + let unified_log = + PathBuf::from(format!("/tmp/{}/test_{}_unified.log", username, test_name)); + + if !old_log.exists() || !unified_log.exists() { + println!("\n⚠ Glog files not found, skipping comparison"); + if !old_log.exists() { + println!(" Missing: {}", old_log.display()); + } + if !unified_log.exists() { + println!(" Missing: {}", unified_log.display()); + } + all_passed = false; + test_passed = false; + } else { + match harness.compare_glog_files(&old_log, &unified_log) { + Ok(()) => { + println!("\n✓ Glog files match"); + } + Err(e) => { + println!("\n✗ Glog comparison FAILED: {}", e); + all_passed = false; + test_passed = false; + } + } + } + + if test_passed { + println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); + } else { + println!("\n✗ Test FAILED: {}", test_name_to_display(test_name)); + } + + // Clean up test files + let _ = std::fs::remove_file(&old_log); + let _ = std::fs::remove_file(&unified_log); + } + + println!("\n\n{}", "=".repeat(80)); + if all_passed { + println!("All tests completed successfully!"); + } else { + println!("Some tests FAILED!"); + return Err(anyhow::anyhow!("Test failures detected")); + } + println!("{}", "=".repeat(80)); + + Ok(()) +} + +/// Called in child process +fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { + let impl_suffix = if impl_type == "--old" { + "old" + } else { + "unified" + }; + let log_suffix = format!("{}_{}", test_name, impl_suffix); + let username = whoami::username(); + let possible_log_paths = vec![ + format!("/tmp/{}/monarch_log_{}.log", username, log_suffix), + format!("/tmp/monarch_log_{}.log", log_suffix), + format!("/logs/dedicated_log_monarch_{}.log", log_suffix), + ]; + + for path in &possible_log_paths { + if std::path::Path::new(path).exists() { + let _ = std::fs::remove_file(path); + println!("Cleaned up existing log file: {}", path); + } + } + + let target_log_copy = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix); + if std::path::Path::new(&target_log_copy).exists() { + let _ = std::fs::remove_file(&target_log_copy); + println!("Cleaned up existing copy file: {}", target_log_copy); + } + + let harness = CorrectnessTestHarness {}; + + let workload: fn() = match test_name { + "simple_info_events" => workload_simple_info_events, + "spans_with_fields" => workload_spans_with_fields, + "nested_spans" => workload_nested_spans, + "events_with_fields" => workload_events_with_fields, + "mixed_log_levels" => workload_mixed_log_levels, + "events_in_spans" => workload_events_in_spans, + _ => { + return Err(anyhow::anyhow!("Unknown test: {}", test_name)); + } + }; + + let results = match impl_type { + "--old" => { + println!("Running with OLD implementation..."); + harness.run(workload)? + } + "--unified" => { + println!("Running with UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + harness.run(workload)? + } + _ => { + return Err(anyhow::anyhow!( + "Unknown implementation type: {}", + impl_type + )); + } + }; + + if let Some(glog_path) = results.glog_path { + let target_path = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix); + + std::fs::copy(&glog_path, &target_path)?; + println!("Glog file copied to: {}", target_path); + } + + Ok(()) +} + +fn test_name_to_display(test_name: &str) -> &str { + match test_name { + "simple_info_events" => "Simple info events", + "spans_with_fields" => "Spans with fields", + "nested_spans" => "Nested spans", + "events_with_fields" => "Events with many fields", + "mixed_log_levels" => "Mixed log levels", + "events_in_spans" => "Events in spans", + _ => test_name, + } +} diff --git a/hyperactor_telemetry/benches/telemetry_benchmark.rs b/hyperactor_telemetry/benches/telemetry_benchmark.rs new file mode 100644 index 000000000..b6653f7a4 --- /dev/null +++ b/hyperactor_telemetry/benches/telemetry_benchmark.rs @@ -0,0 +1,204 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Benchmark comparing old vs unified telemetry implementations. +//! +//! This benchmark simulates a realistic workload with: +//! - Nested spans (simulating actor message processing) +//! - Events at different log levels +//! - Field recording +//! - Multiple iterations +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --unified +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --compare + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor` just for `hyperactor::clock::Clock` + +use std::time::Instant; + +use hyperactor_telemetry::*; + +fn stage_debug_events_only(iterations: usize) { + for i in 0..iterations { + tracing::debug!(iteration = i, "debug event"); + } +} + +fn stage_info_events_only(iterations: usize) { + for i in 0..iterations { + tracing::info!(iteration = i, "info event"); + } +} + +fn stage_trace_events_only(iterations: usize) { + for i in 0..iterations { + tracing::trace!(iteration = i, "trace event"); + } +} + +fn stage_error_events_only(iterations: usize) { + for i in 0..iterations { + tracing::error!(iteration = i, "error event"); + } +} + +fn stage_simple_spans_only(iterations: usize) { + for _ in 0..iterations { + let _span = tracing::info_span!("simple_span").entered(); + } +} + +fn stage_spans_with_fields(iterations: usize) { + for i in 0..iterations { + let _span = tracing::info_span!( + "span_with_fields", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn stage_nested_spans(iterations: usize) { + for _ in 0..iterations { + let _outer = tracing::info_span!("outer", level = 1).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + } + } + } +} + +fn stage_events_with_fields(iterations: usize) { + for i in 0..iterations { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with fields" + ); + } +} + +fn run_benchmark_stages(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + let stages: Vec<(&'static str, fn(usize))> = vec![ + ("Debug events only", stage_debug_events_only), + ("Info events only", stage_info_events_only), + ("Trace events only", stage_trace_events_only), + ("Error events only", stage_error_events_only), + ("Simple spans only", stage_simple_spans_only), + ("Spans with fields", stage_spans_with_fields), + ("Nested spans (3 levels)", stage_nested_spans), + ("Events with fields", stage_events_with_fields), + ]; + + let mut results = Vec::new(); + + for (name, stage_fn) in stages { + // Warm up + stage_fn(10); + + // Benchmark + let start = Instant::now(); + stage_fn(iterations); + let elapsed = start.elapsed(); + + println!( + " {:30} {} iterations in {:>12?} ({:>10?}/iter)", + format!("{}:", name), + iterations, + elapsed, + elapsed / iterations as u32 + ); + + results.push((name, elapsed)); + } + + results +} + +fn benchmark(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + println!("{}", "=".repeat(100)); + + initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + + let results = run_benchmark_stages(iterations); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + results +} + +fn main() { + let args: Vec = std::env::args().collect(); + + let iterations = 1000; + + if args.len() < 2 { + println!("Usage: {} [--old | --unified | --compare]", args[0]); + println!(" --old: Benchmark old implementation only"); + println!(" --unified: Benchmark unified implementation only"); + println!(" --compare: Run both in separate processes and compare"); + return; + } + + match args[1].as_str() { + "--old" => { + println!("Benchmarking OLD implementation..."); + // Don't set USE_UNIFIED_LAYER - uses old implementation + let _results = benchmark(iterations); + println!("\n{}", "=".repeat(100)); + } + "--unified" => { + println!("Benchmarking UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + let _results = benchmark(iterations); + println!("\n{}", "=".repeat(100)); + } + "--compare" => { + println!( + "Running comparison benchmark with {} iterations...\n", + iterations + ); + + let old_status = std::process::Command::new(&args[0]) + .arg("--old") + .status() + .expect("Failed to spawn old implementation"); + + if !old_status.success() { + eprintln!("\n✗ OLD implementation benchmark FAILED"); + return; + } + + let unified_status = std::process::Command::new(&args[0]) + .arg("--unified") + .status() + .expect("Failed to spawn unified implementation"); + + if !unified_status.success() { + eprintln!("\n✗ UNIFIED implementation benchmark FAILED"); + } + } + _ => { + println!("Unknown option: {}", args[1]); + println!("Use --old, --unified, or --compare"); + } + } +} diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 0210e0a5e..3ddd82ebd 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -7,6 +7,7 @@ */ #![allow(internal_features)] +#![allow(clippy::disallowed_methods)] // hyperactor_telemetry can't use hyperactor::clock::Clock (circular dependency) #![feature(assert_matches)] #![feature(sync_unsafe_cell)] #![feature(mpmc_channel)] @@ -33,9 +34,15 @@ pub const ENABLE_SQLITE_TRACING: &str = "ENABLE_SQLITE_TRACING"; /// Environment variable constants // Log level (debug, info, warn, error, critical) to capture for Monarch traces on dedicated log file (changes based on environment, see `log_file_path`). const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG"; +// Suffix to append to log filenames for test isolation +pub const MONARCH_LOG_SUFFIX_ENV: &str = "MONARCH_LOG_SUFFIX"; pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME"; +/// Environment variable to enable the unified layer. +/// Set to "1" to enable. +pub const USE_UNIFIED_LAYER: &str = "USE_UNIFIED_LAYER"; + // Log level constants const LOG_LEVEL_INFO: &str = "info"; const LOG_LEVEL_DEBUG: &str = "debug"; @@ -59,10 +66,12 @@ mod meta; mod otel; mod pool; pub mod recorder; +pub mod sinks; mod spool; pub mod sqlite; pub mod task; pub mod trace; +pub mod trace_dispatcher; use std::io::Write; use std::str::FromStr; use std::sync::Arc; @@ -170,7 +179,8 @@ fn writer() -> Box { match env::Env::current() { env::Env::Test => Box::new(std::io::stderr()), env::Env::Local | env::Env::MastEmulator | env::Env::Mast => { - let (path, filename) = log_file_path(env::Env::current(), None).unwrap(); + let suffix = std::env::var(MONARCH_LOG_SUFFIX_ENV).ok(); + let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap(); match try_create_appender(&path, &filename, true) { Ok(file_appender) => Box::new(file_appender), Err(e) => { @@ -549,6 +559,13 @@ pub fn initialize_logging_for_test() { initialize_logging(DefaultTelemetryClock {}); } +fn is_layer_enabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} +fn is_layer_disabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} + /// Set up logging based on the given execution environment. We specialize logging based on how the /// logs are consumed. The destination scuba table is specialized based on the execution environment. /// mast -> monarch_tracing/prod @@ -567,6 +584,8 @@ pub fn initialize_logging_with_log_prefix( clock: impl TelemetryClock + Send + 'static, prefix_env_var: Option, ) { + let use_unified = std::env::var(USE_UNIFIED_LAYER).unwrap_or_default() == "1"; + swap_telemetry_clock(clock); let file_log_level = match env::Env::current() { env::Env::Local => LOG_LEVEL_INFO, @@ -574,27 +593,6 @@ pub fn initialize_logging_with_log_prefix( env::Env::Mast => LOG_LEVEL_INFO, env::Env::Test => LOG_LEVEL_DEBUG, }; - let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default() - .lossy(false) - .finish(writer()); - let writer_guard = Arc::new((non_blocking, guard)); - let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); - - let file_layer = fmt::Layer::default() - .with_writer(writer_guard.0.clone()) - .event_format(PrefixedFormatter::new(prefix_env_var.clone())) - .fmt_fields(GlogFields::default().compact()) - .with_ansi(false) - .with_filter( - Targets::new() - .with_default(LevelFilter::from_level( - tracing::Level::from_str( - &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()), - ) - .expect("Invalid log level"), - )) - .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about - ); use tracing_subscriber::Registry; use tracing_subscriber::layer::SubscriberExt; @@ -603,50 +601,100 @@ pub fn initialize_logging_with_log_prefix( #[cfg(fbcode_build)] { use crate::env::Env; - fn is_layer_enabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - fn is_layer_disabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - if let Err(err) = Registry::default() - .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { - // TODO: get_reloadable_sqlite_layer currently still returns None, - // and some additional work is required to make it work. - Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) - } else { - None - }) - .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { - Some(otel::tracing_layer()) - } else { - None - }) - .with(file_layer) - .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { - Some(recorder().layer()) - } else { - None - }) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); + if use_unified { + let mut sinks: Vec> = Vec::new(); + sinks.push(Box::new(sinks::glog::GlogSink::new( + writer(), + prefix_env_var.clone(), + file_log_level, + ))); + + if let Err(err) = Registry::default() + .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { + // TODO: get_reloadable_sqlite_layer currently still returns None, + // and some additional work is required to make it work. + Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .with(trace_dispatcher::TraceEventDispatcher::new(sinks, None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + // For file_layer, use NonBlocking + let (non_blocking, guard) = + tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_layer = fmt::Layer::default() + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new(prefix_env_var.clone())) + .fmt_fields(GlogFields::default().compact()) + .with_ansi(false) + .with_filter( + Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV) + .unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about + ); + + if let Err(err) = Registry::default() + .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { + // TODO: get_reloadable_sqlite_layer currently still returns None, + // and some additional work is required to make it work. + Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(file_layer) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } } let exec_id = env::execution_id(); - tracing::info!( - target: "execution", - execution_id = exec_id, - environment = %Env::current(), - args = ?std::env::args(), - build_mode = build_info::BuildInfo::get_build_mode(), - compiler = build_info::BuildInfo::get_compiler(), - compiler_version = build_info::BuildInfo::get_compiler_version(), - buck_rule = build_info::BuildInfo::get_rule(), - package_name = build_info::BuildInfo::get_package_name(), - package_release = build_info::BuildInfo::get_package_release(), - upstream_revision = build_info::BuildInfo::get_upstream_revision(), - revision = build_info::BuildInfo::get_revision(), - "logging_initialized" + meta::log_execution_event( + &exec_id, + &Env::current().to_string(), + std::env::args().collect(), + build_info::BuildInfo::get_build_mode(), + build_info::BuildInfo::get_compiler(), + build_info::BuildInfo::get_compiler_version(), + build_info::BuildInfo::get_rule(), + build_info::BuildInfo::get_package_name(), + build_info::BuildInfo::get_package_release(), + build_info::BuildInfo::get_upstream_revision(), + build_info::BuildInfo::get_revision(), ); if !is_layer_disabled(DISABLE_OTEL_METRICS) { @@ -655,18 +703,54 @@ pub fn initialize_logging_with_log_prefix( } #[cfg(not(fbcode_build))] { - if let Err(err) = Registry::default() - .with(file_layer) - .with( - if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" { - Some(recorder().layer()) - } else { - None - }, - ) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); + let registry = Registry::default().with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }); + + if use_unified { + let mut sinks: Vec> = Vec::new(); + sinks.push(Box::new(sinks::glog::GlogSink::new( + writer(), + prefix_env_var.clone(), + file_log_level, + ))); + + if let Err(err) = registry + .with(trace_dispatcher::TraceEventDispatcher::new(sinks, None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + let (non_blocking, guard) = + tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_layer = fmt::Layer::default() + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new(prefix_env_var.clone())) + .fmt_fields(GlogFields::default().compact()) + .with_ansi(false) + .with_filter( + Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV) + .unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), + ); + + if let Err(err) = registry.with(file_layer).try_init() { + tracing::debug!("logging already initialized for this process: {}", err); + } } } } diff --git a/hyperactor_telemetry/src/sinks/glog.rs b/hyperactor_telemetry/src/sinks/glog.rs new file mode 100644 index 000000000..c65803df9 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/glog.rs @@ -0,0 +1,257 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Glog-formatted text sink for trace events. +//! Replicates the behavior of the fmt::Layer with glog formatting. + +use std::collections::HashMap; +use std::io::BufWriter; +use std::io::Write; +use std::str::FromStr; + +use anyhow::Result; +use indexmap::IndexMap; +use tracing_core::LevelFilter; +use tracing_subscriber::filter::Targets; + +use crate::MONARCH_FILE_LOG_ENV; +use crate::trace_dispatcher::FieldValue; +use crate::trace_dispatcher::TraceEvent; +use crate::trace_dispatcher::TraceEventSink; + +/// Glog sink that writes events in glog format to a file. +/// This replaces the fmt::Layer that was previously used for text logging. +/// +/// This only logs Events, not Spans (matching old fmt::Layer behavior). +pub struct GlogSink { + writer: BufWriter>, + prefix: Option, + /// Track active spans by ID with (name, fields, parent_id) to show span context in event logs + active_spans: HashMap, Option)>, + targets: Targets, +} + +impl GlogSink { + /// Create a new glog sink with the given writer. + /// + /// # Arguments + /// * `writer` - Writer to write log events to (will be buffered) + /// * `prefix_env_var` - Optional environment variable name to read prefix from (matching old impl) + /// * `min_level` - Minimum log level to capture (e.g., INFO, DEBUG) + pub fn new( + writer: Box, + prefix_env_var: Option, + file_log_level: &str, + ) -> Self { + let prefix = if let Some(ref env_var_name) = prefix_env_var { + std::env::var(env_var_name).ok() + } else { + None + }; + + Self { + writer: BufWriter::new(writer), + prefix, + active_spans: HashMap::new(), + targets: Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about + } + } + + fn write_event(&mut self, event: &TraceEvent) -> Result<()> { + let timestamp_str = match event { + TraceEvent::Event { timestamp, .. } => { + let datetime: chrono::DateTime = (*timestamp).into(); + datetime.format("%m%d %H:%M:%S%.6f").to_string() + } + // write_event is only called for Events, but keep this for safety + _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(), + }; + + let prefix_str = if let Some(ref p) = self.prefix { + format!("[{}]", p) + } else { + "[-]".to_string() + }; + + match event { + TraceEvent::Event { + level, + fields, + parent_span, + thread_id, + file, + line, + .. + } => { + let level_char = match *level { + tracing::Level::ERROR => 'E', + tracing::Level::WARN => 'W', + tracing::Level::INFO => 'I', + tracing::Level::DEBUG => 'D', + tracing::Level::TRACE => 'T', + }; + + // [prefix]LMMDD HH:MM:SS.ffffff thread_id file:line] message, key:value, key:value + write!( + self.writer, + "{}{}{} {} ", + prefix_str, level_char, timestamp_str, thread_id + )?; + + if let (Some(f), Some(l)) = (file, line) { + write!(self.writer, "{}:{}] ", f, l)?; + } else { + write!(self.writer, "unknown:0] ")?; + } + + if let Some(parent_id) = parent_span { + self.write_span_context(*parent_id)?; + } + + if let Some(v) = fields.get("message") { + match v { + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + _ => write!(self.writer, "event")?, + } + } else { + write!(self.writer, "event")?; + } + + for (k, v) in fields.iter() { + if k != "message" { + write!(self.writer, ", {}:", k)?; + match v { + FieldValue::Bool(b) => write!(self.writer, "{}", b)?, + FieldValue::I64(i) => write!(self.writer, "{}", i)?, + FieldValue::U64(u) => write!(self.writer, "{}", u)?, + FieldValue::F64(f) => write!(self.writer, "{}", f)?, + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + } + } + } + + writeln!(self.writer)?; + } + + // write_event should only be called for Events, but handle gracefully + _ => { + writeln!( + self.writer, + "{}I{} - unknown:0] unexpected event type", + prefix_str, timestamp_str + )?; + } + } + + Ok(()) + } + + /// Writes span context: "[outer{field:value}, inner{field:value}] " + fn write_span_context(&mut self, span_id: u64) -> Result<()> { + let mut span_ids = Vec::new(); + let mut current_id = Some(span_id); + + while let Some(id) = current_id { + if let Some((_, _, parent_id)) = self.active_spans.get(&id) { + span_ids.push(id); + current_id = *parent_id; + } else { + break; + } + } + if span_ids.is_empty() { + return Ok(()); + } + + write!(self.writer, "[")?; + + for (i, id) in span_ids.iter().rev().enumerate() { + if i > 0 { + write!(self.writer, ", ")?; + } + + if let Some((name, fields, _)) = self.active_spans.get(id) { + write!(self.writer, "{}", name)?; + if !fields.is_empty() { + write!(self.writer, "{{")?; + + let mut first = true; + for (k, v) in fields.iter() { + if !first { + write!(self.writer, ", ")?; + } + first = false; + write!(self.writer, "{}:", k)?; + + match v { + FieldValue::Bool(b) => write!(self.writer, "{}", b)?, + FieldValue::I64(i) => write!(self.writer, "{}", i)?, + FieldValue::U64(u) => write!(self.writer, "{}", u)?, + FieldValue::F64(f) => write!(self.writer, "{}", f)?, + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + } + } + + write!(self.writer, "}}")?; + } + } + } + + write!(self.writer, "] ")?; + Ok(()) + } +} + +impl TraceEventSink for GlogSink { + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { + match event { + // Track span lifecycle for context display (must happen even if we don't export spans) + TraceEvent::NewSpan { + id, + name, + fields, + parent_id, + .. + } => { + self.active_spans + .insert(*id, (name.to_string(), fields.clone(), *parent_id)); + } + TraceEvent::SpanClose { id, .. } => { + self.active_spans.remove(id); + } + TraceEvent::Event { .. } => { + self.write_event(event)?; + } + _ => {} + } + Ok(()) + } + + fn flush(&mut self) -> Result<(), anyhow::Error> { + self.writer.flush()?; + Ok(()) + } + + fn name(&self) -> &str { + "GlogSink" + } + + fn target_filter(&self) -> Option<&Targets> { + Some(&self.targets) + } +} diff --git a/hyperactor_telemetry/src/sinks/mod.rs b/hyperactor_telemetry/src/sinks/mod.rs new file mode 100644 index 000000000..3e8569d67 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/mod.rs @@ -0,0 +1,13 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Exporters for the unified telemetry layer. +//! Each exporter implements the TraceExporter trait and handles +//! writing events to a specific backend (SQLite, Scuba, glog, etc). + +pub mod glog; diff --git a/hyperactor_telemetry/src/trace_dispatcher.rs b/hyperactor_telemetry/src/trace_dispatcher.rs new file mode 100644 index 000000000..5d4726a7c --- /dev/null +++ b/hyperactor_telemetry/src/trace_dispatcher.rs @@ -0,0 +1,439 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Unified telemetry layer that captures trace events once and fans out to multiple exporters +//! on a background thread, eliminating redundant capture and moving work off the application +//! thread. + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::mpsc; +use std::thread::JoinHandle; +use std::time::Duration; +use std::time::SystemTime; + +use indexmap::IndexMap; +use tracing::Id; +use tracing::Subscriber; +use tracing_subscriber::filter::Targets; +use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::Layer; +use tracing_subscriber::registry::LookupSpan; + +const QUEUE_CAPACITY: usize = 100_000; + +/// Unified representation of a trace event captured from the tracing layer. +/// This is captured once on the application thread, then sent to the background +/// worker for fan-out to multiple exporters. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum TraceEvent { + /// A new span was created (on_new_span) + NewSpan { + id: u64, + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_id: Option, + thread_name: String, + file: Option<&'static str>, + line: Option, + }, + /// A span was entered (on_enter) + SpanEnter { id: u64, timestamp: SystemTime }, + /// A span was exited (on_exit) + SpanExit { id: u64, timestamp: SystemTime }, + /// A span was closed (dropped) + SpanClose { id: u64, timestamp: SystemTime }, + /// A tracing event occurred (e.g., tracing::info!()) + Event { + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_span: Option, + thread_id: String, + thread_name: String, + module_path: Option<&'static str>, + file: Option<&'static str>, + line: Option, + }, +} + +/// Simplified field value representation for trace events +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum FieldValue { + Bool(bool), + I64(i64), + U64(u64), + F64(f64), + Str(String), + Debug(String), +} + +/// Trait for sinks that receive trace events from the dispatcher. +/// Implementations run on the background worker thread and can perform +/// expensive I/O operations without blocking the application. +pub(crate) trait TraceEventSink: Send + 'static { + /// Consume a single event. Called on background thread. + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>; + + /// Optional target/level filter for this sink. + /// + /// The worker loop automatically applies this filter before calling `consume()`, + /// so sinks don't need to check target/level in their consume implementation. + /// Only `NewSpan` and `Event` are filtered by target/level; other event types + /// are always passed through. + /// + /// # Returns + /// - `None` - No filtering, all events are consumed (default) + /// - `Some(Targets)` - Only consume events matching the target/level filter + /// + /// # Example + /// ```ignore + /// fn target_filter(&self) -> Option<&Targets> { + /// Some(Targets::new() + /// .with_target("opentelemetry", LevelFilter::OFF) + /// .with_default(LevelFilter::DEBUG)) + /// } + /// ``` + fn target_filter(&self) -> Option<&Targets> { + None + } + + /// Flush any buffered events to the backend. + /// Called periodically and on shutdown. + fn flush(&mut self) -> Result<(), anyhow::Error>; + + /// Optional: return name for debugging/logging + fn name(&self) -> &str { + std::any::type_name::() + } +} + +/// The trace event dispatcher that captures events once and dispatches to multiple sinks +/// on a background thread. +pub struct TraceEventDispatcher { + sender: Option>, + _worker_handle: WorkerHandle, + max_level: Option, + dropped_events: Arc, +} + +struct WorkerHandle { + join_handle: Option>, +} + +impl TraceEventDispatcher { + /// Create a new trace event dispatcher with the given sinks. + /// Uses a bounded channel (capacity 10,000) to ensure telemetry never blocks + /// the application. Events are dropped with a warning if the queue is full. + /// + /// # Arguments + /// * `sinks` - List of sinks to dispatch events to. + /// * `max_level` - Maximum level filter hint (None for no filtering) + pub(crate) fn new( + sinks: Vec>, + max_level: Option, + ) -> Self { + let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY); + let dropped_events = Arc::new(AtomicU64::new(0)); + let dropped_events_worker = Arc::clone(&dropped_events); + + let worker_handle = std::thread::Builder::new() + .name("telemetry-worker".into()) + .spawn(move || { + worker_loop(receiver, sinks, dropped_events_worker); + }) + .expect("failed to spawn telemetry worker thread"); + + Self { + sender: Some(sender), + _worker_handle: WorkerHandle { + join_handle: Some(worker_handle), + }, + max_level, + dropped_events, + } + } + + fn send_event(&self, event: TraceEvent) { + if let Some(sender) = &self.sender { + if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) { + let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1; + + if dropped.is_multiple_of(1000) { + eprintln!( + "[telemetry] WARNING: {} events dropped due to full queue (capacity: 10,000). \ + Telemetry worker may be falling behind.", + dropped + ); + } + } + } + } +} + +impl Drop for TraceEventDispatcher { + fn drop(&mut self) { + // Explicitly drop the sender to close the channel. + // The next field to be dropped is `worker_handle` which + // will run it's own drop impl to join the thread and flush + drop(self.sender.take()); + } +} + +impl Layer for TraceEventDispatcher +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let metadata = attrs.metadata(); + let mut fields = IndexMap::new(); + + let mut visitor = FieldVisitor(&mut fields); + attrs.record(&mut visitor); + + let parent_id = if let Some(parent) = attrs.parent() { + Some(parent.into_u64()) + } else { + ctx.current_span().id().map(|id| id.into_u64()) + }; + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let event = TraceEvent::NewSpan { + id: id.into_u64(), + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_id, + thread_name, + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(event); + } + + fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanEnter { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanExit { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let metadata = event.metadata(); + let mut fields = IndexMap::new(); + let mut visitor = FieldVisitor(&mut fields); + event.record(&mut visitor); + + let parent_span = ctx.event_span(event).map(|span| span.id().into_u64()); + + #[cfg(target_os = "linux")] + let thread_id_num = { + // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only + // syscall that returns the current thread's kernel thread ID (TID). + // The cast to u64 is safe because gettid() returns a positive pid_t. + unsafe { libc::syscall(libc::SYS_gettid) as u64 } + }; + #[cfg(not(target_os = "linux"))] + let thread_id_num = { + let tid = std::thread::current().id(); + // SAFETY: ThreadId is a newtype wrapper around a u64 counter. + // This transmute relies on the internal representation of ThreadId, + // which is stable in practice but not guaranteed by Rust's API. + // On non-Linux platforms this is a best-effort approximation. + // See: https://doc.rust-lang.org/std/thread/struct.ThreadId.html + unsafe { std::mem::transmute::(tid) } + }; + let thread_id_str = thread_id_num.to_string(); + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let trace_event = TraceEvent::Event { + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_span, + thread_id: thread_id_str, + thread_name, + module_path: metadata.module_path(), + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(trace_event); + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanClose { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn max_level_hint(&self) -> Option { + self.max_level + } +} + +struct FieldVisitor<'a>(&'a mut IndexMap); + +impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.0 + .insert(field.name().to_string(), FieldValue::Bool(value)); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.0 + .insert(field.name().to_string(), FieldValue::I64(value)); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.0 + .insert(field.name().to_string(), FieldValue::U64(value)); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.0 + .insert(field.name().to_string(), FieldValue::F64(value)); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0 + .insert(field.name().to_string(), FieldValue::Str(value.to_string())); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + FieldValue::Debug(format!("{:?}", value)), + ); + } +} + +/// Background worker loop that receives events and dispatches them to sinks. +/// Runs until the sender is dropped +fn worker_loop( + receiver: mpsc::Receiver, + mut sinks: Vec>, + dropped_events: Arc, +) { + const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + const FLUSH_EVENT_COUNT: usize = 1000; + let mut last_flush = std::time::Instant::now(); + let mut events_since_flush = 0; + + fn flush_sinks(sinks: &mut [Box]) { + for sink in sinks { + if let Err(e) = sink.flush() { + eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e); + } + } + } + + fn dispatch_to_sinks(sinks: &mut [Box], event: TraceEvent) { + for sink in sinks { + if match &event { + TraceEvent::NewSpan { target, level, .. } + | TraceEvent::Event { target, level, .. } => match sink.target_filter() { + Some(targets) => targets.would_enable(target, level), + None => true, + }, + _ => true, + } { + if let Err(e) = sink.consume(&event) { + eprintln!( + "[telemetry] sink {} failed to consume event: {}", + sink.name(), + e + ); + } + } + } + } + + loop { + match receiver.recv_timeout(FLUSH_INTERVAL) { + Ok(event) => { + dispatch_to_sinks(&mut sinks, event); + events_since_flush += 1; + + if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL + { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + } + Err(mpsc::RecvTimeoutError::Timeout) => { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + break; + } + } + } + + while let Ok(event) = receiver.try_recv() { + dispatch_to_sinks(&mut sinks, event); + } + + flush_sinks(&mut sinks); + + let total_dropped = dropped_events.load(Ordering::Relaxed); + if total_dropped > 0 { + eprintln!( + "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}", + total_dropped + ); + } +} + +impl Drop for WorkerHandle { + fn drop(&mut self) { + if let Some(handle) = self.join_handle.take() { + if let Err(e) = handle.join() { + eprintln!("[telemetry] worker thread panicked: {:?}", e); + } + } + } +}