From 21b90365dc953eeec3dd04c89123117ac64f4112 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Tue, 25 Nov 2025 09:41:26 -0800 Subject: [PATCH 1/6] Allow disallowed time methods (#1974) Summary: We disallow methods relating to time to ensure that we use `hyperactor::clock`, but need to make an exception for this for telemetry because we would create a circular dependency if we tried to use `hyperactor::clock` Reviewed By: shayne-fletcher, dulinriley Differential Revision: D87664116 --- hyperactor_telemetry/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 0210e0a5e..9af458162 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -7,6 +7,7 @@ */ #![allow(internal_features)] +#![allow(clippy::disallowed_methods)] // hyperactor_telemetry can't use hyperactor::clock::Clock (circular dependency) #![feature(assert_matches)] #![feature(sync_unsafe_cell)] #![feature(mpmc_channel)] From 1d6cd13e413911d0aaff3230f42e306777af2710 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Tue, 25 Nov 2025 09:41:26 -0800 Subject: [PATCH 2/6] Log to monarch_executions directly (#1988) Summary: Each process only logs to monarch_executions once at the beginning of the execution so there is no need to add a scuba client that logs to this table into our tracing subscriber Reviewed By: vidhyav Differential Revision: D87664117 --- hyperactor_telemetry/src/lib.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 9af458162..9bfbc71c0 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -634,20 +634,18 @@ pub fn initialize_logging_with_log_prefix( tracing::debug!("logging already initialized for this process: {}", err); } let exec_id = env::execution_id(); - tracing::info!( - target: "execution", - execution_id = exec_id, - environment = %Env::current(), - args = ?std::env::args(), - build_mode = build_info::BuildInfo::get_build_mode(), - compiler = build_info::BuildInfo::get_compiler(), - compiler_version = build_info::BuildInfo::get_compiler_version(), - buck_rule = build_info::BuildInfo::get_rule(), - package_name = build_info::BuildInfo::get_package_name(), - package_release = build_info::BuildInfo::get_package_release(), - upstream_revision = build_info::BuildInfo::get_upstream_revision(), - revision = build_info::BuildInfo::get_revision(), - "logging_initialized" + meta::log_execution_event( + &exec_id, + &Env::current().to_string(), + std::env::args().collect(), + build_info::BuildInfo::get_build_mode(), + build_info::BuildInfo::get_compiler(), + build_info::BuildInfo::get_compiler_version(), + build_info::BuildInfo::get_rule(), + build_info::BuildInfo::get_package_name(), + build_info::BuildInfo::get_package_release(), + build_info::BuildInfo::get_upstream_revision(), + build_info::BuildInfo::get_revision(), ); if !is_layer_disabled(DISABLE_OTEL_METRICS) { From 1f99975d3e8538ae5151325250c6753bfb236429 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Tue, 25 Nov 2025 09:41:26 -0800 Subject: [PATCH 3/6] Create TraceDispatcher (#1928) Summary: Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results Differential Revision: D87363773 --- hyperactor_telemetry/Cargo.toml | 12 +- .../benches/correctness_test.rs | 238 ++++++++++ .../benches/telemetry_benchmark.rs | 204 ++++++++ hyperactor_telemetry/src/lib.rs | 78 +++- hyperactor_telemetry/src/trace_dispatcher.rs | 439 ++++++++++++++++++ 5 files changed, 951 insertions(+), 20 deletions(-) create mode 100644 hyperactor_telemetry/benches/correctness_test.rs create mode 100644 hyperactor_telemetry/benches/telemetry_benchmark.rs create mode 100644 hyperactor_telemetry/src/trace_dispatcher.rs diff --git a/hyperactor_telemetry/Cargo.toml b/hyperactor_telemetry/Cargo.toml index d01e23b0f..82cac324f 100644 --- a/hyperactor_telemetry/Cargo.toml +++ b/hyperactor_telemetry/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry +# @generated by autocargo from //monarch/hyperactor_telemetry:[correctness_test,hyperactor_telemetry,telemetry_benchmark] [package] name = "hyperactor_telemetry" @@ -10,13 +10,23 @@ license = "BSD-3-Clause" [lib] edition = "2024" +[[bench]] +name = "correctness_test" +edition = "2024" + +[[bench]] +name = "telemetry_benchmark" +edition = "2024" + [dependencies] anyhow = "1.0.98" chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false } dashmap = { version = "5.5.3", features = ["rayon", "serde"] } fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } hdrhistogram = "7.5" +indexmap = { version = "2.9.0", features = ["arbitrary", "rayon", "serde"] } lazy_static = "1.5" +libc = "0.2.139" opentelemetry = "0.29" opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] } rand = { version = "0.8", features = ["small_rng"] } diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs new file mode 100644 index 000000000..a060b82c2 --- /dev/null +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -0,0 +1,238 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Correctness test harness comparing old vs unified telemetry implementations. +//! +//! This test harness runs identical workloads through both implementations and +//! verifies that the outputs are equivalent across all exporters +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:correctness_test + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock` + +use anyhow::Result; +use hyperactor_telemetry::*; + +struct TestResults {} + +struct CorrectnessTestHarness {} + +impl CorrectnessTestHarness { + fn run(&self, workload: F) -> Result + where + F: Fn(), + { + initialize_logging_with_log_prefix( + DefaultTelemetryClock {}, + Some("TEST_LOG_PREFIX".to_string()), + ); + + workload(); + + std::thread::sleep(std::time::Duration::from_millis(300)); + + Ok(TestResults {}) + } +} + +// ============================================================================ +// Test Workloads +// ============================================================================ + +fn workload_simple_info_events() { + for i in 0..100 { + tracing::info!(iteration = i, "simple info event"); + } +} + +fn workload_spans_with_fields() { + for i in 0..50 { + let _span = tracing::info_span!( + "test_span", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn workload_nested_spans() { + for i in 0..20 { + let _outer = tracing::info_span!("outer", iteration = i).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + tracing::info!("inside nested span"); + } + } + } +} + +fn workload_events_with_fields() { + for i in 0..100 { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with many fields" + ); + } +} + +fn workload_mixed_log_levels() { + for _ in 0..25 { + tracing::trace!("trace event"); + tracing::debug!(value = 1, "debug event"); + tracing::info!(value = 2, "info event"); + tracing::warn!(value = 3, "warn event"); + tracing::error!(value = 4, "error event"); + } +} + +fn workload_events_in_spans() { + for i in 0..30 { + let _span = tracing::info_span!("outer_span", iteration = i).entered(); + tracing::info!(step = "start", "starting work"); + tracing::debug!(step = "middle", "doing work"); + tracing::info!(step = "end", "finished work"); + } +} + +fn main() -> Result<()> { + let args: Vec = std::env::args().collect(); + + // This script will spawn itself into this branch + if args.len() > 2 { + let test_name = &args[1]; + let impl_type = &args[2]; + return run_single_test(test_name, impl_type); + } + + println!("\n\nHyperactor Telemetry Correctness Test Suite"); + println!("Comparing OLD vs UNIFIED implementations\n"); + + let tests = vec![ + "simple_info_events", + "spans_with_fields", + "nested_spans", + "events_with_fields", + "mixed_log_levels", + "events_in_spans", + ]; + + let mut all_passed = true; + + for test_name in tests { + println!("\n{}", "=".repeat(80)); + println!("Running test: {}", test_name_to_display(test_name)); + println!("{}", "=".repeat(80)); + + let mut test_passed = true; + + println!("\n[Running OLD implementation...]"); + let old_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--old") + .env("TEST_LOG_PREFIX", "test") + .status()?; + + if !old_status.success() { + println!("\n✗ OLD implementation FAILED"); + all_passed = false; + test_passed = false; + } + + println!("\n[Running UNIFIED implementation...]"); + let unified_status = std::process::Command::new(&args[0]) + .arg(test_name) + .arg("--unified") + .env("TEST_LOG_PREFIX", "test") + .status()?; + + if !unified_status.success() { + println!("\n✗ UNIFIED implementation FAILED"); + all_passed = false; + test_passed = false; + } + + if test_passed { + println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); + } else { + println!("\n✗ Test FAILED: {}", test_name_to_display(test_name)); + } + } + + println!("\n\n{}", "=".repeat(80)); + if all_passed { + println!("All tests completed successfully!"); + } else { + println!("Some tests FAILED!"); + return Err(anyhow::anyhow!("Test failures detected")); + } + println!("{}", "=".repeat(80)); + + Ok(()) +} + +/// Called in child process +fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { + let harness = CorrectnessTestHarness {}; + + let workload: fn() = match test_name { + "simple_info_events" => workload_simple_info_events, + "spans_with_fields" => workload_spans_with_fields, + "nested_spans" => workload_nested_spans, + "events_with_fields" => workload_events_with_fields, + "mixed_log_levels" => workload_mixed_log_levels, + "events_in_spans" => workload_events_in_spans, + _ => { + return Err(anyhow::anyhow!("Unknown test: {}", test_name)); + } + }; + + let _results = match impl_type { + "--old" => { + println!("Running with OLD implementation..."); + harness.run(workload)? + } + "--unified" => { + println!("Running with UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + harness.run(workload)? + } + _ => { + return Err(anyhow::anyhow!( + "Unknown implementation type: {}", + impl_type + )); + } + }; + + Ok(()) +} + +fn test_name_to_display(test_name: &str) -> &str { + match test_name { + "simple_info_events" => "Simple info events", + "spans_with_fields" => "Spans with fields", + "nested_spans" => "Nested spans", + "events_with_fields" => "Events with many fields", + "mixed_log_levels" => "Mixed log levels", + "events_in_spans" => "Events in spans", + _ => test_name, + } +} diff --git a/hyperactor_telemetry/benches/telemetry_benchmark.rs b/hyperactor_telemetry/benches/telemetry_benchmark.rs new file mode 100644 index 000000000..b6653f7a4 --- /dev/null +++ b/hyperactor_telemetry/benches/telemetry_benchmark.rs @@ -0,0 +1,204 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Benchmark comparing old vs unified telemetry implementations. +//! +//! This benchmark simulates a realistic workload with: +//! - Nested spans (simulating actor message processing) +//! - Events at different log levels +//! - Field recording +//! - Multiple iterations +//! +//! Usage: +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --unified +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --compare + +#![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor` just for `hyperactor::clock::Clock` + +use std::time::Instant; + +use hyperactor_telemetry::*; + +fn stage_debug_events_only(iterations: usize) { + for i in 0..iterations { + tracing::debug!(iteration = i, "debug event"); + } +} + +fn stage_info_events_only(iterations: usize) { + for i in 0..iterations { + tracing::info!(iteration = i, "info event"); + } +} + +fn stage_trace_events_only(iterations: usize) { + for i in 0..iterations { + tracing::trace!(iteration = i, "trace event"); + } +} + +fn stage_error_events_only(iterations: usize) { + for i in 0..iterations { + tracing::error!(iteration = i, "error event"); + } +} + +fn stage_simple_spans_only(iterations: usize) { + for _ in 0..iterations { + let _span = tracing::info_span!("simple_span").entered(); + } +} + +fn stage_spans_with_fields(iterations: usize) { + for i in 0..iterations { + let _span = tracing::info_span!( + "span_with_fields", + iteration = i, + foo = 42, + message_type = "Request" + ) + .entered(); + } +} + +fn stage_nested_spans(iterations: usize) { + for _ in 0..iterations { + let _outer = tracing::info_span!("outer", level = 1).entered(); + { + let _middle = tracing::info_span!("middle", level = 2).entered(); + { + let _inner = tracing::info_span!("inner", level = 3).entered(); + } + } + } +} + +fn stage_events_with_fields(iterations: usize) { + for i in 0..iterations { + tracing::info!( + iteration = i, + foo = 42, + message_type = "Request", + status = "ok", + count = 100, + "event with fields" + ); + } +} + +fn run_benchmark_stages(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + let stages: Vec<(&'static str, fn(usize))> = vec![ + ("Debug events only", stage_debug_events_only), + ("Info events only", stage_info_events_only), + ("Trace events only", stage_trace_events_only), + ("Error events only", stage_error_events_only), + ("Simple spans only", stage_simple_spans_only), + ("Spans with fields", stage_spans_with_fields), + ("Nested spans (3 levels)", stage_nested_spans), + ("Events with fields", stage_events_with_fields), + ]; + + let mut results = Vec::new(); + + for (name, stage_fn) in stages { + // Warm up + stage_fn(10); + + // Benchmark + let start = Instant::now(); + stage_fn(iterations); + let elapsed = start.elapsed(); + + println!( + " {:30} {} iterations in {:>12?} ({:>10?}/iter)", + format!("{}:", name), + iterations, + elapsed, + elapsed / iterations as u32 + ); + + results.push((name, elapsed)); + } + + results +} + +fn benchmark(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + println!("{}", "=".repeat(100)); + + initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + + let results = run_benchmark_stages(iterations); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + results +} + +fn main() { + let args: Vec = std::env::args().collect(); + + let iterations = 1000; + + if args.len() < 2 { + println!("Usage: {} [--old | --unified | --compare]", args[0]); + println!(" --old: Benchmark old implementation only"); + println!(" --unified: Benchmark unified implementation only"); + println!(" --compare: Run both in separate processes and compare"); + return; + } + + match args[1].as_str() { + "--old" => { + println!("Benchmarking OLD implementation..."); + // Don't set USE_UNIFIED_LAYER - uses old implementation + let _results = benchmark(iterations); + println!("\n{}", "=".repeat(100)); + } + "--unified" => { + println!("Benchmarking UNIFIED implementation..."); + // Set USE_UNIFIED_LAYER to use unified implementation + // SAFETY: Setting before any telemetry initialization + unsafe { + std::env::set_var("USE_UNIFIED_LAYER", "1"); + } + let _results = benchmark(iterations); + println!("\n{}", "=".repeat(100)); + } + "--compare" => { + println!( + "Running comparison benchmark with {} iterations...\n", + iterations + ); + + let old_status = std::process::Command::new(&args[0]) + .arg("--old") + .status() + .expect("Failed to spawn old implementation"); + + if !old_status.success() { + eprintln!("\n✗ OLD implementation benchmark FAILED"); + return; + } + + let unified_status = std::process::Command::new(&args[0]) + .arg("--unified") + .status() + .expect("Failed to spawn unified implementation"); + + if !unified_status.success() { + eprintln!("\n✗ UNIFIED implementation benchmark FAILED"); + } + } + _ => { + println!("Unknown option: {}", args[1]); + println!("Use --old, --unified, or --compare"); + } + } +} diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 9bfbc71c0..c63c46d3b 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -37,6 +37,10 @@ const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG"; pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME"; +/// Environment variable to enable the unified layer. +/// Set to "1" to enable. +pub const USE_UNIFIED_LAYER: &str = "USE_UNIFIED_LAYER"; + // Log level constants const LOG_LEVEL_INFO: &str = "info"; const LOG_LEVEL_DEBUG: &str = "debug"; @@ -64,6 +68,7 @@ mod spool; pub mod sqlite; pub mod task; pub mod trace; +pub mod trace_dispatcher; use std::io::Write; use std::str::FromStr; use std::sync::Arc; @@ -550,6 +555,13 @@ pub fn initialize_logging_for_test() { initialize_logging(DefaultTelemetryClock {}); } +fn is_layer_enabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} +fn is_layer_disabled(env_var: &str) -> bool { + std::env::var(env_var).unwrap_or_default() == "1" +} + /// Set up logging based on the given execution environment. We specialize logging based on how the /// logs are consumed. The destination scuba table is specialized based on the execution environment. /// mast -> monarch_tracing/prod @@ -568,6 +580,8 @@ pub fn initialize_logging_with_log_prefix( clock: impl TelemetryClock + Send + 'static, prefix_env_var: Option, ) { + let use_unified = std::env::var(USE_UNIFIED_LAYER).unwrap_or_default() == "1"; + swap_telemetry_clock(clock); let file_log_level = match env::Env::current() { env::Env::Local => LOG_LEVEL_INFO, @@ -604,13 +618,32 @@ pub fn initialize_logging_with_log_prefix( #[cfg(fbcode_build)] { use crate::env::Env; - fn is_layer_enabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - fn is_layer_disabled(env_var: &str) -> bool { - std::env::var(env_var).unwrap_or_default() == "1" - } - if let Err(err) = Registry::default() + if use_unified { + if let Err(err) = Registry::default() + .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { + // TODO: get_reloadable_sqlite_layer currently still returns None, + // and some additional work is required to make it work. + Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(file_layer) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .with(trace_dispatcher::TraceEventDispatcher::new(vec![], None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else if let Err(err) = Registry::default() .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { // TODO: get_reloadable_sqlite_layer currently still returns None, // and some additional work is required to make it work. @@ -654,18 +687,25 @@ pub fn initialize_logging_with_log_prefix( } #[cfg(not(fbcode_build))] { - if let Err(err) = Registry::default() - .with(file_layer) - .with( - if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" { - Some(recorder().layer()) - } else { - None - }, - ) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); + let registry = Registry::default().with(file_layer).with( + if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }, + ); + + if use_unified { + if let Err(err) = registry + .with(trace_dispatcher::TraceEventDispatcher::new(vec![], None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + if let Err(err) = registry.try_init() { + tracing::debug!("logging already initialized for this process: {}", err); + } } } } diff --git a/hyperactor_telemetry/src/trace_dispatcher.rs b/hyperactor_telemetry/src/trace_dispatcher.rs new file mode 100644 index 000000000..5d4726a7c --- /dev/null +++ b/hyperactor_telemetry/src/trace_dispatcher.rs @@ -0,0 +1,439 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Unified telemetry layer that captures trace events once and fans out to multiple exporters +//! on a background thread, eliminating redundant capture and moving work off the application +//! thread. + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::mpsc; +use std::thread::JoinHandle; +use std::time::Duration; +use std::time::SystemTime; + +use indexmap::IndexMap; +use tracing::Id; +use tracing::Subscriber; +use tracing_subscriber::filter::Targets; +use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::Layer; +use tracing_subscriber::registry::LookupSpan; + +const QUEUE_CAPACITY: usize = 100_000; + +/// Unified representation of a trace event captured from the tracing layer. +/// This is captured once on the application thread, then sent to the background +/// worker for fan-out to multiple exporters. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum TraceEvent { + /// A new span was created (on_new_span) + NewSpan { + id: u64, + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_id: Option, + thread_name: String, + file: Option<&'static str>, + line: Option, + }, + /// A span was entered (on_enter) + SpanEnter { id: u64, timestamp: SystemTime }, + /// A span was exited (on_exit) + SpanExit { id: u64, timestamp: SystemTime }, + /// A span was closed (dropped) + SpanClose { id: u64, timestamp: SystemTime }, + /// A tracing event occurred (e.g., tracing::info!()) + Event { + name: &'static str, + target: &'static str, + level: tracing::Level, + fields: IndexMap, + timestamp: SystemTime, + parent_span: Option, + thread_id: String, + thread_name: String, + module_path: Option<&'static str>, + file: Option<&'static str>, + line: Option, + }, +} + +/// Simplified field value representation for trace events +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) enum FieldValue { + Bool(bool), + I64(i64), + U64(u64), + F64(f64), + Str(String), + Debug(String), +} + +/// Trait for sinks that receive trace events from the dispatcher. +/// Implementations run on the background worker thread and can perform +/// expensive I/O operations without blocking the application. +pub(crate) trait TraceEventSink: Send + 'static { + /// Consume a single event. Called on background thread. + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>; + + /// Optional target/level filter for this sink. + /// + /// The worker loop automatically applies this filter before calling `consume()`, + /// so sinks don't need to check target/level in their consume implementation. + /// Only `NewSpan` and `Event` are filtered by target/level; other event types + /// are always passed through. + /// + /// # Returns + /// - `None` - No filtering, all events are consumed (default) + /// - `Some(Targets)` - Only consume events matching the target/level filter + /// + /// # Example + /// ```ignore + /// fn target_filter(&self) -> Option<&Targets> { + /// Some(Targets::new() + /// .with_target("opentelemetry", LevelFilter::OFF) + /// .with_default(LevelFilter::DEBUG)) + /// } + /// ``` + fn target_filter(&self) -> Option<&Targets> { + None + } + + /// Flush any buffered events to the backend. + /// Called periodically and on shutdown. + fn flush(&mut self) -> Result<(), anyhow::Error>; + + /// Optional: return name for debugging/logging + fn name(&self) -> &str { + std::any::type_name::() + } +} + +/// The trace event dispatcher that captures events once and dispatches to multiple sinks +/// on a background thread. +pub struct TraceEventDispatcher { + sender: Option>, + _worker_handle: WorkerHandle, + max_level: Option, + dropped_events: Arc, +} + +struct WorkerHandle { + join_handle: Option>, +} + +impl TraceEventDispatcher { + /// Create a new trace event dispatcher with the given sinks. + /// Uses a bounded channel (capacity 10,000) to ensure telemetry never blocks + /// the application. Events are dropped with a warning if the queue is full. + /// + /// # Arguments + /// * `sinks` - List of sinks to dispatch events to. + /// * `max_level` - Maximum level filter hint (None for no filtering) + pub(crate) fn new( + sinks: Vec>, + max_level: Option, + ) -> Self { + let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY); + let dropped_events = Arc::new(AtomicU64::new(0)); + let dropped_events_worker = Arc::clone(&dropped_events); + + let worker_handle = std::thread::Builder::new() + .name("telemetry-worker".into()) + .spawn(move || { + worker_loop(receiver, sinks, dropped_events_worker); + }) + .expect("failed to spawn telemetry worker thread"); + + Self { + sender: Some(sender), + _worker_handle: WorkerHandle { + join_handle: Some(worker_handle), + }, + max_level, + dropped_events, + } + } + + fn send_event(&self, event: TraceEvent) { + if let Some(sender) = &self.sender { + if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) { + let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1; + + if dropped.is_multiple_of(1000) { + eprintln!( + "[telemetry] WARNING: {} events dropped due to full queue (capacity: 10,000). \ + Telemetry worker may be falling behind.", + dropped + ); + } + } + } + } +} + +impl Drop for TraceEventDispatcher { + fn drop(&mut self) { + // Explicitly drop the sender to close the channel. + // The next field to be dropped is `worker_handle` which + // will run it's own drop impl to join the thread and flush + drop(self.sender.take()); + } +} + +impl Layer for TraceEventDispatcher +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let metadata = attrs.metadata(); + let mut fields = IndexMap::new(); + + let mut visitor = FieldVisitor(&mut fields); + attrs.record(&mut visitor); + + let parent_id = if let Some(parent) = attrs.parent() { + Some(parent.into_u64()) + } else { + ctx.current_span().id().map(|id| id.into_u64()) + }; + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let event = TraceEvent::NewSpan { + id: id.into_u64(), + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_id, + thread_name, + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(event); + } + + fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanEnter { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanExit { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let metadata = event.metadata(); + let mut fields = IndexMap::new(); + let mut visitor = FieldVisitor(&mut fields); + event.record(&mut visitor); + + let parent_span = ctx.event_span(event).map(|span| span.id().into_u64()); + + #[cfg(target_os = "linux")] + let thread_id_num = { + // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only + // syscall that returns the current thread's kernel thread ID (TID). + // The cast to u64 is safe because gettid() returns a positive pid_t. + unsafe { libc::syscall(libc::SYS_gettid) as u64 } + }; + #[cfg(not(target_os = "linux"))] + let thread_id_num = { + let tid = std::thread::current().id(); + // SAFETY: ThreadId is a newtype wrapper around a u64 counter. + // This transmute relies on the internal representation of ThreadId, + // which is stable in practice but not guaranteed by Rust's API. + // On non-Linux platforms this is a best-effort approximation. + // See: https://doc.rust-lang.org/std/thread/struct.ThreadId.html + unsafe { std::mem::transmute::(tid) } + }; + let thread_id_str = thread_id_num.to_string(); + + let thread_name = std::thread::current() + .name() + .unwrap_or_default() + .to_string(); + + let trace_event = TraceEvent::Event { + name: metadata.name(), + target: metadata.target(), + level: *metadata.level(), + fields, + timestamp: SystemTime::now(), + parent_span, + thread_id: thread_id_str, + thread_name, + module_path: metadata.module_path(), + file: metadata.file(), + line: metadata.line(), + }; + + self.send_event(trace_event); + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + let event = TraceEvent::SpanClose { + id: id.into_u64(), + timestamp: SystemTime::now(), + }; + + self.send_event(event); + } + + fn max_level_hint(&self) -> Option { + self.max_level + } +} + +struct FieldVisitor<'a>(&'a mut IndexMap); + +impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.0 + .insert(field.name().to_string(), FieldValue::Bool(value)); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.0 + .insert(field.name().to_string(), FieldValue::I64(value)); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.0 + .insert(field.name().to_string(), FieldValue::U64(value)); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.0 + .insert(field.name().to_string(), FieldValue::F64(value)); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0 + .insert(field.name().to_string(), FieldValue::Str(value.to_string())); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + FieldValue::Debug(format!("{:?}", value)), + ); + } +} + +/// Background worker loop that receives events and dispatches them to sinks. +/// Runs until the sender is dropped +fn worker_loop( + receiver: mpsc::Receiver, + mut sinks: Vec>, + dropped_events: Arc, +) { + const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + const FLUSH_EVENT_COUNT: usize = 1000; + let mut last_flush = std::time::Instant::now(); + let mut events_since_flush = 0; + + fn flush_sinks(sinks: &mut [Box]) { + for sink in sinks { + if let Err(e) = sink.flush() { + eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e); + } + } + } + + fn dispatch_to_sinks(sinks: &mut [Box], event: TraceEvent) { + for sink in sinks { + if match &event { + TraceEvent::NewSpan { target, level, .. } + | TraceEvent::Event { target, level, .. } => match sink.target_filter() { + Some(targets) => targets.would_enable(target, level), + None => true, + }, + _ => true, + } { + if let Err(e) = sink.consume(&event) { + eprintln!( + "[telemetry] sink {} failed to consume event: {}", + sink.name(), + e + ); + } + } + } + } + + loop { + match receiver.recv_timeout(FLUSH_INTERVAL) { + Ok(event) => { + dispatch_to_sinks(&mut sinks, event); + events_since_flush += 1; + + if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL + { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + } + Err(mpsc::RecvTimeoutError::Timeout) => { + flush_sinks(&mut sinks); + last_flush = std::time::Instant::now(); + events_since_flush = 0; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + break; + } + } + } + + while let Ok(event) = receiver.try_recv() { + dispatch_to_sinks(&mut sinks, event); + } + + flush_sinks(&mut sinks); + + let total_dropped = dropped_events.load(Ordering::Relaxed); + if total_dropped > 0 { + eprintln!( + "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}", + total_dropped + ); + } +} + +impl Drop for WorkerHandle { + fn drop(&mut self) { + if let Some(handle) = self.join_handle.take() { + if let Err(e) = handle.join() { + eprintln!("[telemetry] worker thread panicked: {:?}", e); + } + } + } +} From bed5f93579bc78fed99e5d8f93c4be66188854a7 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Tue, 25 Nov 2025 09:41:26 -0800 Subject: [PATCH 4/6] Create Glog Exporter (#1929) Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `file_layer` and update our correctness test to ensure that files produced by both the existing and the new implementation are the same Differential Revision: D87363775 --- .../benches/correctness_test.rs | 188 ++++++++++++- hyperactor_telemetry/src/lib.rs | 153 +++++++---- hyperactor_telemetry/src/sinks/glog.rs | 257 ++++++++++++++++++ hyperactor_telemetry/src/sinks/mod.rs | 13 + 4 files changed, 553 insertions(+), 58 deletions(-) create mode 100644 hyperactor_telemetry/src/sinks/glog.rs create mode 100644 hyperactor_telemetry/src/sinks/mod.rs diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs index a060b82c2..978a36aa2 100644 --- a/hyperactor_telemetry/benches/correctness_test.rs +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -9,17 +9,22 @@ //! Correctness test harness comparing old vs unified telemetry implementations. //! //! This test harness runs identical workloads through both implementations and -//! verifies that the outputs are equivalent across all exporters +//! verifies that the outputs are equivalent across all exporters: +//! - Glog: Read log files and compare lines //! //! Usage: //! buck2 run //monarch/hyperactor_telemetry:correctness_test #![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock` +use std::path::PathBuf; + use anyhow::Result; use hyperactor_telemetry::*; -struct TestResults {} +struct TestResults { + glog_path: Option, +} struct CorrectnessTestHarness {} @@ -37,7 +42,104 @@ impl CorrectnessTestHarness { std::thread::sleep(std::time::Duration::from_millis(300)); - Ok(TestResults {}) + Ok(TestResults { + glog_path: Self::find_glog_path(), + }) + } + + fn find_glog_path() -> Option { + let username = whoami::username(); + let suffix = std::env::var(hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV) + .map(|s| format!("_{}", s)) + .unwrap_or_default(); + let possible_paths = vec![ + format!("/tmp/{}/monarch_log{}.log", username, suffix), + format!("/tmp/monarch_log{}.log", suffix), + format!("/logs/dedicated_log_monarch{}.log", suffix), + ]; + + for path in possible_paths { + if std::path::Path::new(&path).exists() { + return Some(PathBuf::from(path)); + } + } + None + } + + /// Normalize a glog line by removing timestamp, thread ID, file:line, and prefix for comparison. + /// Both old and unified implementations should now use the same format: + /// "[prefix]Lmmdd HH:MM:SS.ffffff thread_id file:line] message, fields" + /// + /// Normalized to: "L] message, fields" (prefix removed) + fn normalize_glog_line(line: &str) -> String { + // Find the level character position + if let Some(level_pos) = line + .chars() + .position(|c| matches!(c, 'I' | 'D' | 'E' | 'W' | 'T')) + { + // Find the closing bracket that comes AFTER the level character (not the one in the prefix) + if let Some(close_bracket) = line[level_pos..].find(']') { + let actual_bracket_pos = level_pos + close_bracket; + let level = &line[level_pos..=level_pos]; // e.g., "I" + let rest = &line[actual_bracket_pos + 1..].trim_start(); // Everything after the real "]" + // Don't include prefix - just level + content + return format!("{}] {}", level, rest); + } + } + + line.to_string() + } + + fn compare_glog_files(&self, old_file: &PathBuf, unified_file: &PathBuf) -> Result<()> { + println!("\n[Comparing Glog Files]"); + println!(" Old: {}", old_file.display()); + println!(" Unified: {}", unified_file.display()); + + let old_content = std::fs::read_to_string(old_file)?; + let unified_content = std::fs::read_to_string(unified_file)?; + + println!(" Old lines: {}", old_content.lines().count()); + println!(" Unified lines: {}", unified_content.lines().count()); + + let old_lines: Vec = old_content.lines().map(Self::normalize_glog_line).collect(); + + let unified_lines: Vec = unified_content + .lines() + .map(Self::normalize_glog_line) + .collect(); + + if old_lines.len() != unified_lines.len() { + return Err(anyhow::anyhow!( + "Line count mismatch: old={} unified={}", + old_lines.len(), + unified_lines.len() + )); + } + + let skip_lines = 1; + + for (i, (old_line, unified_line)) in old_lines + .iter() + .zip(unified_lines.iter()) + .enumerate() + .skip(skip_lines) + { + if old_line != unified_line { + return Err(anyhow::anyhow!( + "Line #{} mismatch:\n old: {}\n unified: {}", + i, + old_line, + unified_line + )); + } + } + + println!( + " ✓ All {} lines match (skipped {} initialization lines)!", + old_lines.len() - skip_lines, + skip_lines + ); + Ok(()) } } @@ -140,10 +242,15 @@ fn main() -> Result<()> { let mut test_passed = true; println!("\n[Running OLD implementation...]"); + let old_log_suffix = format!("{}_old", test_name); let old_status = std::process::Command::new(&args[0]) .arg(test_name) .arg("--old") .env("TEST_LOG_PREFIX", "test") + .env( + hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, + &old_log_suffix, + ) .status()?; if !old_status.success() { @@ -153,10 +260,15 @@ fn main() -> Result<()> { } println!("\n[Running UNIFIED implementation...]"); + let unified_log_suffix = format!("{}_unified", test_name); let unified_status = std::process::Command::new(&args[0]) .arg(test_name) .arg("--unified") .env("TEST_LOG_PREFIX", "test") + .env( + hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, + &unified_log_suffix, + ) .status()?; if !unified_status.success() { @@ -165,11 +277,46 @@ fn main() -> Result<()> { test_passed = false; } + let username = whoami::username(); + let harness = CorrectnessTestHarness {}; + + // Compare glog files + let old_log = PathBuf::from(format!("/tmp/{}/test_{}_old.log", username, test_name)); + let unified_log = + PathBuf::from(format!("/tmp/{}/test_{}_unified.log", username, test_name)); + + if !old_log.exists() || !unified_log.exists() { + println!("\n⚠ Glog files not found, skipping comparison"); + if !old_log.exists() { + println!(" Missing: {}", old_log.display()); + } + if !unified_log.exists() { + println!(" Missing: {}", unified_log.display()); + } + all_passed = false; + test_passed = false; + } else { + match harness.compare_glog_files(&old_log, &unified_log) { + Ok(()) => { + println!("\n✓ Glog files match"); + } + Err(e) => { + println!("\n✗ Glog comparison FAILED: {}", e); + all_passed = false; + test_passed = false; + } + } + } + if test_passed { println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); } else { println!("\n✗ Test FAILED: {}", test_name_to_display(test_name)); } + + // Clean up test files + let _ = std::fs::remove_file(&old_log); + let _ = std::fs::remove_file(&unified_log); } println!("\n\n{}", "=".repeat(80)); @@ -186,6 +333,32 @@ fn main() -> Result<()> { /// Called in child process fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { + let impl_suffix = if impl_type == "--old" { + "old" + } else { + "unified" + }; + let log_suffix = format!("{}_{}", test_name, impl_suffix); + let username = whoami::username(); + let possible_log_paths = vec![ + format!("/tmp/{}/monarch_log_{}.log", username, log_suffix), + format!("/tmp/monarch_log_{}.log", log_suffix), + format!("/logs/dedicated_log_monarch_{}.log", log_suffix), + ]; + + for path in &possible_log_paths { + if std::path::Path::new(path).exists() { + let _ = std::fs::remove_file(path); + println!("Cleaned up existing log file: {}", path); + } + } + + let target_log_copy = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix); + if std::path::Path::new(&target_log_copy).exists() { + let _ = std::fs::remove_file(&target_log_copy); + println!("Cleaned up existing copy file: {}", target_log_copy); + } + let harness = CorrectnessTestHarness {}; let workload: fn() = match test_name { @@ -200,7 +373,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { } }; - let _results = match impl_type { + let results = match impl_type { "--old" => { println!("Running with OLD implementation..."); harness.run(workload)? @@ -222,6 +395,13 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { } }; + if let Some(glog_path) = results.glog_path { + let target_path = format!("/tmp/{}/test_{}_{}.log", username, test_name, impl_suffix); + + std::fs::copy(&glog_path, &target_path)?; + println!("Glog file copied to: {}", target_path); + } + Ok(()) } diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index c63c46d3b..3ddd82ebd 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -34,6 +34,8 @@ pub const ENABLE_SQLITE_TRACING: &str = "ENABLE_SQLITE_TRACING"; /// Environment variable constants // Log level (debug, info, warn, error, critical) to capture for Monarch traces on dedicated log file (changes based on environment, see `log_file_path`). const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG"; +// Suffix to append to log filenames for test isolation +pub const MONARCH_LOG_SUFFIX_ENV: &str = "MONARCH_LOG_SUFFIX"; pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME"; @@ -64,6 +66,7 @@ mod meta; mod otel; mod pool; pub mod recorder; +pub mod sinks; mod spool; pub mod sqlite; pub mod task; @@ -176,7 +179,8 @@ fn writer() -> Box { match env::Env::current() { env::Env::Test => Box::new(std::io::stderr()), env::Env::Local | env::Env::MastEmulator | env::Env::Mast => { - let (path, filename) = log_file_path(env::Env::current(), None).unwrap(); + let suffix = std::env::var(MONARCH_LOG_SUFFIX_ENV).ok(); + let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap(); match try_create_appender(&path, &filename, true) { Ok(file_appender) => Box::new(file_appender), Err(e) => { @@ -589,27 +593,6 @@ pub fn initialize_logging_with_log_prefix( env::Env::Mast => LOG_LEVEL_INFO, env::Env::Test => LOG_LEVEL_DEBUG, }; - let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default() - .lossy(false) - .finish(writer()); - let writer_guard = Arc::new((non_blocking, guard)); - let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); - - let file_layer = fmt::Layer::default() - .with_writer(writer_guard.0.clone()) - .event_format(PrefixedFormatter::new(prefix_env_var.clone())) - .fmt_fields(GlogFields::default().compact()) - .with_ansi(false) - .with_filter( - Targets::new() - .with_default(LevelFilter::from_level( - tracing::Level::from_str( - &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()), - ) - .expect("Invalid log level"), - )) - .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about - ); use tracing_subscriber::Registry; use tracing_subscriber::layer::SubscriberExt; @@ -619,6 +602,62 @@ pub fn initialize_logging_with_log_prefix( { use crate::env::Env; if use_unified { + let mut sinks: Vec> = Vec::new(); + sinks.push(Box::new(sinks::glog::GlogSink::new( + writer(), + prefix_env_var.clone(), + file_log_level, + ))); + + if let Err(err) = Registry::default() + .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { + // TODO: get_reloadable_sqlite_layer currently still returns None, + // and some additional work is required to make it work. + Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None + }) + .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }) + .with(trace_dispatcher::TraceEventDispatcher::new(sinks, None)) + .try_init() + { + tracing::debug!("logging already initialized for this process: {}", err); + } + } else { + // For file_layer, use NonBlocking + let (non_blocking, guard) = + tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_layer = fmt::Layer::default() + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new(prefix_env_var.clone())) + .fmt_fields(GlogFields::default().compact()) + .with_ansi(false) + .with_filter( + Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV) + .unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about + ); + if let Err(err) = Registry::default() .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { // TODO: get_reloadable_sqlite_layer currently still returns None, @@ -638,33 +677,10 @@ pub fn initialize_logging_with_log_prefix( } else { None }) - .with(trace_dispatcher::TraceEventDispatcher::new(vec![], None)) .try_init() { tracing::debug!("logging already initialized for this process: {}", err); } - } else if let Err(err) = Registry::default() - .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { - // TODO: get_reloadable_sqlite_layer currently still returns None, - // and some additional work is required to make it work. - Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) - } else { - None - }) - .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { - Some(otel::tracing_layer()) - } else { - None - }) - .with(file_layer) - .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { - Some(recorder().layer()) - } else { - None - }) - .try_init() - { - tracing::debug!("logging already initialized for this process: {}", err); } let exec_id = env::execution_id(); meta::log_execution_event( @@ -687,23 +703,52 @@ pub fn initialize_logging_with_log_prefix( } #[cfg(not(fbcode_build))] { - let registry = Registry::default().with(file_layer).with( - if !is_layer_disabled(DISABLE_RECORDER_TRACING) { - Some(recorder().layer()) - } else { - None - }, - ); + let registry = Registry::default().with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { + Some(recorder().layer()) + } else { + None + }); if use_unified { + let mut sinks: Vec> = Vec::new(); + sinks.push(Box::new(sinks::glog::GlogSink::new( + writer(), + prefix_env_var.clone(), + file_log_level, + ))); + if let Err(err) = registry - .with(trace_dispatcher::TraceEventDispatcher::new(vec![], None)) + .with(trace_dispatcher::TraceEventDispatcher::new(sinks, None)) .try_init() { tracing::debug!("logging already initialized for this process: {}", err); } } else { - if let Err(err) = registry.try_init() { + let (non_blocking, guard) = + tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_layer = fmt::Layer::default() + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new(prefix_env_var.clone())) + .fmt_fields(GlogFields::default().compact()) + .with_ansi(false) + .with_filter( + Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV) + .unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), + ); + + if let Err(err) = registry.with(file_layer).try_init() { tracing::debug!("logging already initialized for this process: {}", err); } } diff --git a/hyperactor_telemetry/src/sinks/glog.rs b/hyperactor_telemetry/src/sinks/glog.rs new file mode 100644 index 000000000..c65803df9 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/glog.rs @@ -0,0 +1,257 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Glog-formatted text sink for trace events. +//! Replicates the behavior of the fmt::Layer with glog formatting. + +use std::collections::HashMap; +use std::io::BufWriter; +use std::io::Write; +use std::str::FromStr; + +use anyhow::Result; +use indexmap::IndexMap; +use tracing_core::LevelFilter; +use tracing_subscriber::filter::Targets; + +use crate::MONARCH_FILE_LOG_ENV; +use crate::trace_dispatcher::FieldValue; +use crate::trace_dispatcher::TraceEvent; +use crate::trace_dispatcher::TraceEventSink; + +/// Glog sink that writes events in glog format to a file. +/// This replaces the fmt::Layer that was previously used for text logging. +/// +/// This only logs Events, not Spans (matching old fmt::Layer behavior). +pub struct GlogSink { + writer: BufWriter>, + prefix: Option, + /// Track active spans by ID with (name, fields, parent_id) to show span context in event logs + active_spans: HashMap, Option)>, + targets: Targets, +} + +impl GlogSink { + /// Create a new glog sink with the given writer. + /// + /// # Arguments + /// * `writer` - Writer to write log events to (will be buffered) + /// * `prefix_env_var` - Optional environment variable name to read prefix from (matching old impl) + /// * `min_level` - Minimum log level to capture (e.g., INFO, DEBUG) + pub fn new( + writer: Box, + prefix_env_var: Option, + file_log_level: &str, + ) -> Self { + let prefix = if let Some(ref env_var_name) = prefix_env_var { + std::env::var(env_var_name).ok() + } else { + None + }; + + Self { + writer: BufWriter::new(writer), + prefix, + active_spans: HashMap::new(), + targets: Targets::new() + .with_default(LevelFilter::from_level( + tracing::Level::from_str( + &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()), + ) + .expect("Invalid log level"), + )) + .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about + } + } + + fn write_event(&mut self, event: &TraceEvent) -> Result<()> { + let timestamp_str = match event { + TraceEvent::Event { timestamp, .. } => { + let datetime: chrono::DateTime = (*timestamp).into(); + datetime.format("%m%d %H:%M:%S%.6f").to_string() + } + // write_event is only called for Events, but keep this for safety + _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(), + }; + + let prefix_str = if let Some(ref p) = self.prefix { + format!("[{}]", p) + } else { + "[-]".to_string() + }; + + match event { + TraceEvent::Event { + level, + fields, + parent_span, + thread_id, + file, + line, + .. + } => { + let level_char = match *level { + tracing::Level::ERROR => 'E', + tracing::Level::WARN => 'W', + tracing::Level::INFO => 'I', + tracing::Level::DEBUG => 'D', + tracing::Level::TRACE => 'T', + }; + + // [prefix]LMMDD HH:MM:SS.ffffff thread_id file:line] message, key:value, key:value + write!( + self.writer, + "{}{}{} {} ", + prefix_str, level_char, timestamp_str, thread_id + )?; + + if let (Some(f), Some(l)) = (file, line) { + write!(self.writer, "{}:{}] ", f, l)?; + } else { + write!(self.writer, "unknown:0] ")?; + } + + if let Some(parent_id) = parent_span { + self.write_span_context(*parent_id)?; + } + + if let Some(v) = fields.get("message") { + match v { + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + _ => write!(self.writer, "event")?, + } + } else { + write!(self.writer, "event")?; + } + + for (k, v) in fields.iter() { + if k != "message" { + write!(self.writer, ", {}:", k)?; + match v { + FieldValue::Bool(b) => write!(self.writer, "{}", b)?, + FieldValue::I64(i) => write!(self.writer, "{}", i)?, + FieldValue::U64(u) => write!(self.writer, "{}", u)?, + FieldValue::F64(f) => write!(self.writer, "{}", f)?, + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + } + } + } + + writeln!(self.writer)?; + } + + // write_event should only be called for Events, but handle gracefully + _ => { + writeln!( + self.writer, + "{}I{} - unknown:0] unexpected event type", + prefix_str, timestamp_str + )?; + } + } + + Ok(()) + } + + /// Writes span context: "[outer{field:value}, inner{field:value}] " + fn write_span_context(&mut self, span_id: u64) -> Result<()> { + let mut span_ids = Vec::new(); + let mut current_id = Some(span_id); + + while let Some(id) = current_id { + if let Some((_, _, parent_id)) = self.active_spans.get(&id) { + span_ids.push(id); + current_id = *parent_id; + } else { + break; + } + } + if span_ids.is_empty() { + return Ok(()); + } + + write!(self.writer, "[")?; + + for (i, id) in span_ids.iter().rev().enumerate() { + if i > 0 { + write!(self.writer, ", ")?; + } + + if let Some((name, fields, _)) = self.active_spans.get(id) { + write!(self.writer, "{}", name)?; + if !fields.is_empty() { + write!(self.writer, "{{")?; + + let mut first = true; + for (k, v) in fields.iter() { + if !first { + write!(self.writer, ", ")?; + } + first = false; + write!(self.writer, "{}:", k)?; + + match v { + FieldValue::Bool(b) => write!(self.writer, "{}", b)?, + FieldValue::I64(i) => write!(self.writer, "{}", i)?, + FieldValue::U64(u) => write!(self.writer, "{}", u)?, + FieldValue::F64(f) => write!(self.writer, "{}", f)?, + FieldValue::Str(s) => write!(self.writer, "{}", s)?, + FieldValue::Debug(s) => write!(self.writer, "{}", s)?, + } + } + + write!(self.writer, "}}")?; + } + } + } + + write!(self.writer, "] ")?; + Ok(()) + } +} + +impl TraceEventSink for GlogSink { + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { + match event { + // Track span lifecycle for context display (must happen even if we don't export spans) + TraceEvent::NewSpan { + id, + name, + fields, + parent_id, + .. + } => { + self.active_spans + .insert(*id, (name.to_string(), fields.clone(), *parent_id)); + } + TraceEvent::SpanClose { id, .. } => { + self.active_spans.remove(id); + } + TraceEvent::Event { .. } => { + self.write_event(event)?; + } + _ => {} + } + Ok(()) + } + + fn flush(&mut self) -> Result<(), anyhow::Error> { + self.writer.flush()?; + Ok(()) + } + + fn name(&self) -> &str { + "GlogSink" + } + + fn target_filter(&self) -> Option<&Targets> { + Some(&self.targets) + } +} diff --git a/hyperactor_telemetry/src/sinks/mod.rs b/hyperactor_telemetry/src/sinks/mod.rs new file mode 100644 index 000000000..3e8569d67 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/mod.rs @@ -0,0 +1,13 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Exporters for the unified telemetry layer. +//! Each exporter implements the TraceExporter trait and handles +//! writing events to a specific backend (SQLite, Scuba, glog, etc). + +pub mod glog; From 0f37fe1e45fd467026539340723a7bbc4e8d41fd Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Tue, 25 Nov 2025 09:41:26 -0800 Subject: [PATCH 5/6] Create Sqlite Exporter (#1930) Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff, we will implement an `Exporter` meant to replace `get_reloadable_sqlite_layer()`. We will update our correctness test to query the tables at the end of each workload and ensure all rows are the same Reviewed By: mariusae Differential Revision: D87363774 --- .../benches/correctness_test.rs | 209 +++++++++++++++++- .../benches/telemetry_benchmark.rs | 96 ++++++-- hyperactor_telemetry/src/lib.rs | 35 ++- hyperactor_telemetry/src/sinks/mod.rs | 1 + hyperactor_telemetry/src/sinks/sqlite.rs | 203 +++++++++++++++++ hyperactor_telemetry/src/sqlite.rs | 15 +- 6 files changed, 525 insertions(+), 34 deletions(-) create mode 100644 hyperactor_telemetry/src/sinks/sqlite.rs diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs index 978a36aa2..b41babdbc 100644 --- a/hyperactor_telemetry/benches/correctness_test.rs +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -11,6 +11,7 @@ //! This test harness runs identical workloads through both implementations and //! verifies that the outputs are equivalent across all exporters: //! - Glog: Read log files and compare lines +//! - SQLite: Query database and compare rows //! //! Usage: //! buck2 run //monarch/hyperactor_telemetry:correctness_test @@ -24,12 +25,15 @@ use hyperactor_telemetry::*; struct TestResults { glog_path: Option, + sqlite_path: Option, + #[allow(dead_code)] + _sqlite_tracing: Option, } struct CorrectnessTestHarness {} impl CorrectnessTestHarness { - fn run(&self, workload: F) -> Result + fn run(&self, workload: F, unified: bool) -> Result where F: Fn(), { @@ -38,12 +42,45 @@ impl CorrectnessTestHarness { Some("TEST_LOG_PREFIX".to_string()), ); + let sqlite_tracing = if unified { + None + } else { + let sqlite_tracing = hyperactor_telemetry::sqlite::SqliteTracing::new() + .expect("Failed to create SqliteTracing"); + let db_path = sqlite_tracing.db_path().expect("No db_path"); + println!("SqliteTracing created successfully, db_path: {:?}", db_path); + println!("Database exists: {}", db_path.exists()); + Some(sqlite_tracing) + }; + workload(); std::thread::sleep(std::time::Duration::from_millis(300)); + let username = whoami::username(); + let possible_paths = vec![ + format!( + "/tmp/{}/hyperactor_trace_{}.db", + username, + std::process::id() + ), + format!("/tmp/hyperactor_trace_{}.db", std::process::id()), + format!("/tmp/traces/hyperactor_trace_{}.db", std::process::id()), + format!("./hyperactor_trace_{}.db", std::process::id()), + ]; + + let mut sqlite_path = None; + for path in possible_paths { + if std::path::Path::new(&path).exists() { + sqlite_path = Some(PathBuf::from(path)); + break; + } + } + Ok(TestResults { + sqlite_path, glog_path: Self::find_glog_path(), + _sqlite_tracing: sqlite_tracing, }) } @@ -141,6 +178,107 @@ impl CorrectnessTestHarness { ); Ok(()) } + + fn compare_sqlite_databases(&self, old_db: &PathBuf, unified_db: &PathBuf) -> Result<()> { + println!("\n[Comparing SQLite Databases]"); + println!(" Old: {}", old_db.display()); + println!(" Unified: {}", unified_db.display()); + + let old_conn = rusqlite::Connection::open(old_db)?; + + old_conn.execute(&format!("ATTACH '{}' AS unified", unified_db.display()), [])?; + + let tables = vec!["log_events", "messages", "actor_lifecycle"]; + + for table in tables { + println!("\n Comparing table: {}", table); + + let old_count: i64 = + old_conn.query_row(&format!("SELECT COUNT(*) FROM main.{}", table), [], |row| { + row.get(0) + })?; + let unified_count: i64 = old_conn.query_row( + &format!("SELECT COUNT(*) FROM unified.{}", table), + [], + |row| row.get(0), + )?; + + println!(" Old rows: {}", old_count); + println!(" Unified rows: {}", unified_count); + + if old_count != unified_count { + return Err(anyhow::anyhow!( + "Table {} row count mismatch: old={} unified={}", + table, + old_count, + unified_count + )); + } + + let mut stmt = old_conn.prepare(&format!("PRAGMA table_info({})", table))?; + let columns: Vec = stmt + .query_map([], |row| { + let name: String = row.get(1)?; + Ok(name) + })? + .collect::, _>>()? + .into_iter() + .filter(|col| col != "time_us") // Ignore time_us column + .collect(); + + if columns.is_empty() { + continue; + } + + let col_list = columns.join(", "); + + let diff_query = format!( + "SELECT '-' as diff_type, {cols} FROM main.{table} + EXCEPT + SELECT '-' as diff_type, {cols} FROM unified.{table} + UNION ALL + SELECT '+' as diff_type, {cols} FROM unified.{table} + EXCEPT + SELECT '+' as diff_type, {cols} FROM main.{table}", + table = table, + cols = col_list + ); + + let mut stmt = old_conn.prepare(&diff_query)?; + let mut rows = stmt.query([])?; + + let mut diffs = Vec::new(); + while let Some(row) = rows.next()? { + let diff_type: String = row.get(0)?; + let color = if diff_type == "-" { + "\x1b[31m" // red + } else { + "\x1b[32m" // green + }; + let mut row_str = format!("{}{} ", color, diff_type); + for i in 1..row.as_ref().column_count() { + let col_name = row.as_ref().column_name(i)?; + let val: Option = row.get(i).ok(); + row_str.push_str(&format!("{}={:?}, ", col_name, val)); + } + row_str.push_str("\x1b[0m"); // reset color + diffs.push(row_str); + } + + if !diffs.is_empty() { + return Err(anyhow::anyhow!( + "Table {} has differences:\n{}", + table, + diffs.join("\n") + )); + } + + println!(" ✓ {} rows match", old_count); + } + + println!("\n ✓ All tables match!"); + Ok(()) + } } // ============================================================================ @@ -251,6 +389,7 @@ fn main() -> Result<()> { hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, &old_log_suffix, ) + .env("ENABLE_SQLITE_TRACING", "1") .status()?; if !old_status.success() { @@ -269,6 +408,7 @@ fn main() -> Result<()> { hyperactor_telemetry::MONARCH_LOG_SUFFIX_ENV, &unified_log_suffix, ) + .env("ENABLE_SQLITE_TRACING", "1") .status()?; if !unified_status.success() { @@ -308,6 +448,32 @@ fn main() -> Result<()> { } } + // Compare SQLite databases + let old_db = PathBuf::from(format!("/tmp/{}/test_{}_old.db", username, test_name)); + let unified_db = PathBuf::from(format!("/tmp/{}/test_{}_unified.db", username, test_name)); + + // SQLite databases are now required - both implementations should create them + if !old_db.exists() { + println!("\n✗ OLD database not found: {}", old_db.display()); + all_passed = false; + test_passed = false; + } else if !unified_db.exists() { + println!("\n✗ UNIFIED database not found: {}", unified_db.display()); + all_passed = false; + test_passed = false; + } else { + match harness.compare_sqlite_databases(&old_db, &unified_db) { + Ok(()) => { + println!("\n✓ SQLite databases match"); + } + Err(e) => { + println!("\n✗ SQLite comparison FAILED: {}", e); + all_passed = false; + test_passed = false; + } + } + } + if test_passed { println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); } else { @@ -315,6 +481,8 @@ fn main() -> Result<()> { } // Clean up test files + let _ = std::fs::remove_file(&old_db); + let _ = std::fs::remove_file(&unified_db); let _ = std::fs::remove_file(&old_log); let _ = std::fs::remove_file(&unified_log); } @@ -376,7 +544,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { let results = match impl_type { "--old" => { println!("Running with OLD implementation..."); - harness.run(workload)? + harness.run(workload, false)? } "--unified" => { println!("Running with UNIFIED implementation..."); @@ -385,7 +553,7 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { unsafe { std::env::set_var("USE_UNIFIED_LAYER", "1"); } - harness.run(workload)? + harness.run(workload, true)? } _ => { return Err(anyhow::anyhow!( @@ -402,6 +570,41 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { println!("Glog file copied to: {}", target_path); } + if let Some(db_path) = results.sqlite_path { + let target_path = format!("/tmp/{}/test_{}_{}.db", username, test_name, impl_suffix); + + println!( + "Attempting to copy database from {} to {}", + db_path.display(), + target_path + ); + std::fs::copy(&db_path, &target_path).map_err(|e| { + anyhow::anyhow!( + "Failed to copy database from {} to {}: {}", + db_path.display(), + target_path, + e + ) + })?; + + // Also copy WAL files if they exist (SQLite WAL mode) + let wal_path = format!("{}-wal", db_path.display()); + let shm_path = format!("{}-shm", db_path.display()); + let target_wal = format!("{}-wal", target_path); + let target_shm = format!("{}-shm", target_path); + + if std::path::Path::new(&wal_path).exists() { + let _ = std::fs::copy(&wal_path, &target_wal); + } + if std::path::Path::new(&shm_path).exists() { + let _ = std::fs::copy(&shm_path, &target_shm); + } + + println!("Database copied to: {}", target_path); + } else { + println!("Warning: No SQLite database path found"); + } + Ok(()) } diff --git a/hyperactor_telemetry/benches/telemetry_benchmark.rs b/hyperactor_telemetry/benches/telemetry_benchmark.rs index b6653f7a4..7a392ca37 100644 --- a/hyperactor_telemetry/benches/telemetry_benchmark.rs +++ b/hyperactor_telemetry/benches/telemetry_benchmark.rs @@ -15,7 +15,8 @@ //! - Multiple iterations //! //! Usage: -//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old-no-sqlite +//! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --old-with-sqlite //! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --unified //! buck2 run //monarch/hyperactor_telemetry:telemetry_benchmark -- --compare @@ -129,11 +130,22 @@ fn run_benchmark_stages(iterations: usize) -> Vec<(&'static str, std::time::Dura results } -fn benchmark(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { - println!("{}", "=".repeat(100)); +fn benchmark_no_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { + initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + + let results = run_benchmark_stages(iterations); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + results +} +fn benchmark_with_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + let _sqlite_tracing = + hyperactor_telemetry::sqlite::SqliteTracing::new().expect("Failed to create SqliteTracing"); + let results = run_benchmark_stages(iterations); std::thread::sleep(std::time::Duration::from_millis(500)); @@ -147,19 +159,27 @@ fn main() { let iterations = 1000; if args.len() < 2 { - println!("Usage: {} [--old | --unified | --compare]", args[0]); - println!(" --old: Benchmark old implementation only"); - println!(" --unified: Benchmark unified implementation only"); - println!(" --compare: Run both in separate processes and compare"); + println!("Usage: {} [OPTIONS]", args[0]); + println!(" --old-no-sqlite: Benchmark old implementation without SQLite"); + println!(" --old-with-sqlite: Benchmark old implementation with SQLite"); + println!( + " --unified: Benchmark unified implementation (use ENABLE_SQLITE_TRACING)" + ); + println!(" --compare: Run all four benchmarks and compare"); return; } match args[1].as_str() { - "--old" => { - println!("Benchmarking OLD implementation..."); + "--old-no-sqlite" => { + println!("\n{}", "=".repeat(100)); + println!("Benchmarking OLD implementation (SQLite DISABLED)..."); // Don't set USE_UNIFIED_LAYER - uses old implementation - let _results = benchmark(iterations); + let _results = benchmark_no_sqlite(iterations); + } + "--old-with-sqlite" => { println!("\n{}", "=".repeat(100)); + println!("Benchmarking OLD implementation (SQLite ENABLED)..."); + let _results = benchmark_with_sqlite(iterations); } "--unified" => { println!("Benchmarking UNIFIED implementation..."); @@ -168,8 +188,16 @@ fn main() { unsafe { std::env::set_var("USE_UNIFIED_LAYER", "1"); } - let _results = benchmark(iterations); println!("\n{}", "=".repeat(100)); + println!( + "Benchmarking UNIFIED implementation (SQLite {})...", + if std::env::var("ENABLE_SQLITE_TRACING").unwrap_or_default() == "1" { + "ENABLED" + } else { + "DISABLED" + } + ); + let _results = benchmark_no_sqlite(iterations); } "--compare" => { println!( @@ -177,28 +205,54 @@ fn main() { iterations ); - let old_status = std::process::Command::new(&args[0]) - .arg("--old") + let old_no_sqlite_status = std::process::Command::new(&args[0]) + .arg("--old-no-sqlite") .status() - .expect("Failed to spawn old implementation"); + .expect("Failed to spawn old implementation without SQLite"); - if !old_status.success() { - eprintln!("\n✗ OLD implementation benchmark FAILED"); + if !old_no_sqlite_status.success() { + eprintln!("\n✗ OLD implementation (no SQLite) benchmark FAILED"); return; } - let unified_status = std::process::Command::new(&args[0]) + let old_with_sqlite_status = std::process::Command::new(&args[0]) + .arg("--old-with-sqlite") + .env("ENABLE_SQLITE_TRACING", "1") + .status() + .expect("Failed to spawn old implementation with SQLite"); + + if !old_with_sqlite_status.success() { + eprintln!("\n✗ OLD implementation (with SQLite) benchmark FAILED"); + return; + } + + let unified_no_sqlite_status = std::process::Command::new(&args[0]) + .arg("--unified") + .status() + .expect("Failed to spawn unified implementation without SQLite"); + + if !unified_no_sqlite_status.success() { + eprintln!("\n✗ UNIFIED implementation (no SQLite) benchmark FAILED"); + } + + let unified_with_sqlite_status = std::process::Command::new(&args[0]) .arg("--unified") + .env("ENABLE_SQLITE_TRACING", "1") .status() - .expect("Failed to spawn unified implementation"); + .expect("Failed to spawn unified implementation with SQLite"); - if !unified_status.success() { - eprintln!("\n✗ UNIFIED implementation benchmark FAILED"); + if !unified_with_sqlite_status.success() { + eprintln!("\n✗ UNIFIED implementation (with SQLite) benchmark FAILED"); + return; } + + println!("All benchmarks completed successfully!"); } _ => { println!("Unknown option: {}", args[1]); - println!("Use --old, --unified, or --compare"); + println!( + "Use --old-no-sqlite, --old-with-sqlite, --unified-no-sqlite, --unified-with-sqlite, or --compare" + ); } } } diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 3ddd82ebd..4bf43f555 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -609,14 +609,23 @@ pub fn initialize_logging_with_log_prefix( file_log_level, ))); + let mut max_level = None; + + let sqlite_enabled = std::env::var(ENABLE_SQLITE_TRACING).unwrap_or_default() == "1"; + + if sqlite_enabled { + match create_sqlite_sink() { + Ok(sink) => { + max_level = Some(tracing::level_filters::LevelFilter::TRACE); + sinks.push(Box::new(sink)); + } + Err(e) => { + tracing::warn!("failed to create SqliteSink: {}", e); + } + } + } + if let Err(err) = Registry::default() - .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { - // TODO: get_reloadable_sqlite_layer currently still returns None, - // and some additional work is required to make it work. - Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) - } else { - None - }) .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { Some(otel::tracing_layer()) } else { @@ -627,7 +636,9 @@ pub fn initialize_logging_with_log_prefix( } else { None }) - .with(trace_dispatcher::TraceEventDispatcher::new(sinks, None)) + .with(trace_dispatcher::TraceEventDispatcher::new( + sinks, max_level, + )) .try_init() { tracing::debug!("logging already initialized for this process: {}", err); @@ -755,6 +766,14 @@ pub fn initialize_logging_with_log_prefix( } } +fn create_sqlite_sink() -> anyhow::Result { + let (db_path, _) = log_file_path(env::Env::current(), Some("traces")) + .expect("failed to determine trace db path"); + let db_file = format!("{}/hyperactor_trace_{}.db", db_path, std::process::id()); + + Ok(sinks::sqlite::SqliteSink::new_with_file(&db_file, 100)?) +} + pub mod env { use rand::RngCore; diff --git a/hyperactor_telemetry/src/sinks/mod.rs b/hyperactor_telemetry/src/sinks/mod.rs index 3e8569d67..8e5c11b46 100644 --- a/hyperactor_telemetry/src/sinks/mod.rs +++ b/hyperactor_telemetry/src/sinks/mod.rs @@ -11,3 +11,4 @@ //! writing events to a specific backend (SQLite, Scuba, glog, etc). pub mod glog; +pub mod sqlite; diff --git a/hyperactor_telemetry/src/sinks/sqlite.rs b/hyperactor_telemetry/src/sinks/sqlite.rs new file mode 100644 index 000000000..fbb183366 --- /dev/null +++ b/hyperactor_telemetry/src/sinks/sqlite.rs @@ -0,0 +1,203 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! SQLite sink with batched writes and transactions. +//! Runs on background thread to avoid blocking application threads. +//! +//! Reuses table definitions and insertion logic from the old SqliteLayer +//! to ensure 100% identical behavior. + +use std::path::Path; + +use anyhow::Result; +use anyhow::anyhow; +use rusqlite::Connection; +use rusqlite::functions::FunctionFlags; +use serde_json::Value as JValue; +use tracing_core::LevelFilter; +use tracing_subscriber::filter::Targets; + +use crate::sqlite; +use crate::trace_dispatcher::FieldValue; +use crate::trace_dispatcher::TraceEvent; +use crate::trace_dispatcher::TraceEventSink; + +/// SQLite sink that batches events and writes them in transactions. +/// Reuses the exact same table schema and insertion logic from SqliteLayer. +pub struct SqliteSink { + conn: Connection, + batch: Vec, + batch_size: usize, + target_filter: Targets, +} + +impl SqliteSink { + /// Create a new SQLite sink with an in-memory database. + /// Matches the API of SqliteLayer::new() + /// + /// # Arguments + /// * `batch_size` - Number of events to batch before flushing to disk + pub fn new(batch_size: usize) -> Result { + let conn = Connection::open_in_memory()?; + Self::setup_connection(conn, batch_size) + } + + /// Create a new SQLite sink with a file-based database. + /// Matches the API of SqliteLayer::new_with_file() + /// + /// # Arguments + /// * `db_path` - Path to SQLite database file + /// * `batch_size` - Number of events to batch before flushing to disk + pub fn new_with_file(db_path: impl AsRef, batch_size: usize) -> Result { + let conn = Connection::open(db_path)?; + Self::setup_connection(conn, batch_size) + } + + fn setup_connection(conn: Connection, batch_size: usize) -> Result { + for table in sqlite::ALL_TABLES.iter() { + conn.execute(&table.create_table_stmt, [])?; + } + + conn.create_scalar_function( + "assert", + 2, + FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC, + move |ctx| { + let condition: bool = ctx.get(0)?; + let message: String = ctx.get(1)?; + + if !condition { + return Err(rusqlite::Error::UserFunctionError( + anyhow!("assertion failed:{condition} {message}",).into(), + )); + } + + Ok(condition) + }, + )?; + + Ok(Self { + conn, + batch: Vec::with_capacity(batch_size), + batch_size, + target_filter: Targets::new() + .with_target("execution", LevelFilter::OFF) + .with_target("opentelemetry", LevelFilter::OFF) + .with_target("hyperactor_telemetry", LevelFilter::OFF) + .with_default(LevelFilter::TRACE), + }) + } + + fn flush_batch(&mut self) -> Result<()> { + if self.batch.is_empty() { + return Ok(()); + } + + let tx = self.conn.transaction()?; + + for event in &self.batch { + // We only batch Event variants in consume(), so this match is guaranteed to succeed + let TraceEvent::Event { + target, + fields, + timestamp, + module_path, + file, + line, + .. + } = event + else { + unreachable!("Only Event variants should be in batch") + }; + + let timestamp_us = timestamp + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_micros() + .to_string(); + + let mut visitor = sqlite::SqlVisitor::default(); + + visitor + .0 + .insert("time_us".to_string(), JValue::String(timestamp_us)); + + if let Some(mp) = module_path { + visitor + .0 + .insert("module_path".to_string(), JValue::String(mp.to_string())); + } + if let Some(l) = line { + visitor + .0 + .insert("line".to_string(), JValue::String(l.to_string())); + } + if let Some(f) = file { + visitor + .0 + .insert("file".to_string(), JValue::String(f.to_string())); + } + + for (key, value) in fields { + let json_value = match value { + FieldValue::Bool(b) => JValue::Bool(*b), + FieldValue::I64(i) => JValue::Number((*i).into()), + FieldValue::U64(u) => JValue::Number((*u).into()), + FieldValue::F64(f) => serde_json::Number::from_f64(*f) + .map(JValue::Number) + .unwrap_or(JValue::Null), + FieldValue::Str(s) => JValue::String(s.clone()), + FieldValue::Debug(d) => JValue::String(d.clone()), + }; + visitor.0.insert(key.clone(), json_value); + } + + let table = if &**target == sqlite::TableName::ACTOR_LIFECYCLE_STR { + sqlite::TableName::ActorLifecycle.get_table() + } else if &**target == sqlite::TableName::MESSAGES_STR { + sqlite::TableName::Messages.get_table() + } else { + sqlite::TableName::LogEvents.get_table() + }; + + sqlite::insert_event_fields(&tx, table, visitor)?; + } + + tx.commit()?; + self.batch.clear(); + + Ok(()) + } +} + +impl TraceEventSink for SqliteSink { + fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { + // Only batch Event variants - we ignore spans + if matches!(event, TraceEvent::Event { .. }) { + self.batch.push(event.clone()); + + if self.batch.len() >= self.batch_size { + self.flush_batch()?; + } + } + + Ok(()) + } + + fn flush(&mut self) -> Result<(), anyhow::Error> { + self.flush_batch() + } + + fn name(&self) -> &str { + "SqliteSink" + } + + fn target_filter(&self) -> Option<&Targets> { + Some(&self.target_filter) + } +} diff --git a/hyperactor_telemetry/src/sqlite.rs b/hyperactor_telemetry/src/sqlite.rs index c5ddbed8e..95da36eff 100644 --- a/hyperactor_telemetry/src/sqlite.rs +++ b/hyperactor_telemetry/src/sqlite.rs @@ -165,7 +165,7 @@ lazy_static! { .as_slice() ) .into(); - static ref ALL_TABLES: Vec = vec![ + pub static ref ALL_TABLES: Vec
= vec![ ACTOR_LIFECYCLE.clone(), MESSAGES.clone(), LOG_EVENTS.clone() @@ -178,7 +178,7 @@ pub struct SqliteLayer { use tracing::field::Visit; #[derive(Debug, Clone, Default, Serialize)] -struct SqlVisitor(HashMap); +pub struct SqlVisitor(pub HashMap); impl Visit for SqlVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { @@ -232,6 +232,17 @@ macro_rules! insert_event { }; } +/// Public helper to insert event fields into database using the same logic as the old implementation. +/// This is used by the unified SqliteExporter to ensure identical behavior. +pub fn insert_event_fields(conn: &Connection, table: &Table, fields: SqlVisitor) -> Result<()> { + conn.prepare_cached(&table.insert_stmt)?.execute( + serde_rusqlite::to_params_named_with_fields(fields, table.columns)? + .to_slice() + .as_slice(), + )?; + Ok(()) +} + impl SqliteLayer { pub fn new() -> Result { let conn = Connection::open_in_memory()?; From 56b96749f5f29c02a2c9f1daf10a58bd7f13f270 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Tue, 25 Nov 2025 09:41:26 -0800 Subject: [PATCH 6/6] Create Scuba Exporter (#1931) Summary: Stack context: ``` Our tracing subscriber has 3 layers: - File logging - Scuba - Sqlite (usually off) Although the actual Scuba logging is done in a background thread and we are using a non-blocking file writer, we still have a good chunk of work that happens for events & spans. The solution to this, is to create a `UnifiedLayer` that just sends everything into a background worker, that then delivers all traces to each `Exporter` to handle. In this diff, we will create an initial `UnifiedLayer` and incrementally move each existing layer into an `Exporter`. To test correctness, we will run both the old and unified implementations for initializing telemetry on a variety of workloads, and ensure that both are producing the same results ``` In this diff we will create an `Exporter` meant to replace `otel::tracing_layer()` (which is really just an alias for scuba). We log to two different scuba tables: monarch_tracing and monarch_executions. We will test correctness by injecting a mock scuba client that simply appends all samples it intends to log, and ensure that both the old and the unified implementations produce the same samples Reviewed By: mariusae Differential Revision: D87363772 --- .../benches/correctness_test.rs | 146 ++++++++++++++++- .../benches/telemetry_benchmark.rs | 2 +- hyperactor_telemetry/src/lib.rs | 151 ++++++++++++++++-- hyperactor_telemetry/src/trace_dispatcher.rs | 2 - 4 files changed, 284 insertions(+), 17 deletions(-) diff --git a/hyperactor_telemetry/benches/correctness_test.rs b/hyperactor_telemetry/benches/correctness_test.rs index b41babdbc..f075f4248 100644 --- a/hyperactor_telemetry/benches/correctness_test.rs +++ b/hyperactor_telemetry/benches/correctness_test.rs @@ -12,12 +12,14 @@ //! verifies that the outputs are equivalent across all exporters: //! - Glog: Read log files and compare lines //! - SQLite: Query database and compare rows +//! - Scuba: Mock client and compare logged samples //! //! Usage: //! buck2 run //monarch/hyperactor_telemetry:correctness_test #![allow(clippy::disallowed_methods)] // don't want to take a dependency on `hyperactor`` just for `hyperactor::clock::Clock` +use std::collections::HashSet; use std::path::PathBuf; use anyhow::Result; @@ -28,6 +30,7 @@ struct TestResults { sqlite_path: Option, #[allow(dead_code)] _sqlite_tracing: Option, + scuba_tracing_samples: Vec, } struct CorrectnessTestHarness {} @@ -37,7 +40,7 @@ impl CorrectnessTestHarness { where F: Fn(), { - initialize_logging_with_log_prefix( + let test_handle = initialize_logging_with_log_prefix_mock_scuba( DefaultTelemetryClock {}, Some("TEST_LOG_PREFIX".to_string()), ); @@ -77,12 +80,103 @@ impl CorrectnessTestHarness { } } + let scuba_tracing_samples = test_handle.get_tracing_samples(); + Ok(TestResults { sqlite_path, glog_path: Self::find_glog_path(), + scuba_tracing_samples, _sqlite_tracing: sqlite_tracing, }) } + fn compare_scuba_samples( + &self, + old_samples: &[hyperactor_telemetry::TelemetrySample], + unified_samples: &[hyperactor_telemetry::TelemetrySample], + table_name: &str, + ) -> Result<()> { + println!("\n[Comparing {} Scuba Samples]", table_name); + println!(" Old samples: {}", old_samples.len()); + println!(" Unified samples: {}", unified_samples.len()); + + if old_samples.is_empty() && unified_samples.is_empty() { + return Err(anyhow::anyhow!("No samples in either implementation")); + } + + if old_samples.len() != unified_samples.len() { + return Err(anyhow::anyhow!( + "Sample count mismatch: old={} unified={}", + old_samples.len(), + unified_samples.len() + )); + } + + for (i, (old, unified)) in old_samples.iter().zip(unified_samples.iter()).enumerate() { + let old_json_str = serde_json::to_string(old)?; + let old_parsed: serde_json::Value = serde_json::from_str(&old_json_str)?; + + let unified_json_str = serde_json::to_string(unified)?; + let unified_parsed: serde_json::Value = serde_json::from_str(&unified_json_str)?; + + let all_fields: HashSet = old_parsed["fields"] + .as_array() + .into_iter() + .flatten() + .chain(unified_parsed["fields"].as_array().into_iter().flatten()) + .filter_map(|field| field.as_array()?.first()?.as_str().map(|s| s.to_string())) + .collect(); + + for field_name in all_fields { + let old_str = old.get_string(&field_name); + let unified_str = unified.get_string(&field_name); + + match field_name.as_str() { + "args" => { + // Allow --old vs --unified difference only + match (old_str, unified_str) { + (Some(old_args), Some(unified_args)) => { + let old_normalized = old_args.replace("--old", "--IMPL"); + let unified_normalized = + unified_args.replace("--unified", "--IMPL"); + + if old_normalized != unified_normalized { + return Err(anyhow::anyhow!( + "Sample #{} field 'args' differs in more than just --old/--unified:\n old: {:?}\n unified: {:?}", + i, + old_args, + unified_args + )); + } + } + (None, None) => {} + _ => { + return Err(anyhow::anyhow!( + "Sample #{} field 'args' present in only one sample:\n old: {:?}\n unified: {:?}", + i, + old_str, + unified_str + )); + } + } + } + _ => { + if old_str != unified_str { + return Err(anyhow::anyhow!( + "Sample #{} field '{}' mismatch:\n old: {:?}\n unified: {:?}", + i, + field_name, + old_str, + unified_str + )); + } + } + } + } + } + + println!(" ✓ All {} samples match!", old_samples.len()); + Ok(()) + } fn find_glog_path() -> Option { let username = whoami::username(); @@ -474,6 +568,48 @@ fn main() -> Result<()> { } } + let old_tracing = PathBuf::from(format!( + "/tmp/{}/test_{}_old_scuba_tracing.json", + username, test_name + )); + let unified_tracing = PathBuf::from(format!( + "/tmp/{}/test_{}_unified_scuba_tracing.json", + username, test_name + )); + + if !old_tracing.exists() || !unified_tracing.exists() { + println!("\n⚠ Scuba tracing sample files not found, skipping comparison"); + if !old_tracing.exists() { + println!(" Missing: {}", old_tracing.display()); + } + if !unified_tracing.exists() { + println!(" Missing: {}", unified_tracing.display()); + } + all_passed = false; + test_passed = false; + } else { + let old_samples_json = std::fs::read_to_string(&old_tracing)?; + let unified_samples_json = std::fs::read_to_string(&unified_tracing)?; + + let old_samples: Vec = serde_json::from_str(&old_samples_json)?; + let unified_samples: Vec = + serde_json::from_str(&unified_samples_json)?; + + match harness.compare_scuba_samples(&old_samples, &unified_samples, "Tracing") { + Ok(()) => { + println!("\n✓ Scuba tracing samples match"); + } + Err(e) => { + println!("\n✗ Scuba tracing comparison FAILED: {}", e); + all_passed = false; + test_passed = false; + } + } + + let _ = std::fs::remove_file(&old_tracing); + let _ = std::fs::remove_file(&unified_tracing); + } + if test_passed { println!("\n✓ Test PASSED: {}", test_name_to_display(test_name)); } else { @@ -605,6 +741,14 @@ fn run_single_test(test_name: &str, impl_type: &str) -> Result<()> { println!("Warning: No SQLite database path found"); } + let tracing_path = format!( + "/tmp/{}/test_{}_{}_scuba_tracing.json", + username, test_name, impl_suffix + ); + let tracing_json = serde_json::to_string_pretty(&results.scuba_tracing_samples)?; + std::fs::write(&tracing_path, tracing_json)?; + println!("Scuba tracing samples saved to: {}", tracing_path); + Ok(()) } diff --git a/hyperactor_telemetry/benches/telemetry_benchmark.rs b/hyperactor_telemetry/benches/telemetry_benchmark.rs index 7a392ca37..ec93c9106 100644 --- a/hyperactor_telemetry/benches/telemetry_benchmark.rs +++ b/hyperactor_telemetry/benches/telemetry_benchmark.rs @@ -141,7 +141,7 @@ fn benchmark_no_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Durat } fn benchmark_with_sqlite(iterations: usize) -> Vec<(&'static str, std::time::Duration)> { - initialize_logging_with_log_prefix(DefaultTelemetryClock {}, None); + initialize_logging_with_log_prefix_mock_scuba(DefaultTelemetryClock {}, None); let _sqlite_tracing = hyperactor_telemetry::sqlite::SqliteTracing::new().expect("Failed to create SqliteTracing"); diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index 4bf43f555..3a2f84422 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -104,6 +104,70 @@ use tracing_subscriber::registry::LookupSpan; use crate::recorder::Recorder; use crate::sqlite::get_reloadable_sqlite_layer; +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct TelemetrySample { + fields: Vec<(String, String)>, +} + +impl TelemetrySample { + pub fn get_string(&self, key: &str) -> Option<&str> { + for (k, v) in &self.fields { + if k == key { + return Some(v.as_str()); + } + } + None + } +} + +#[cfg(fbcode_build)] +impl From for TelemetrySample { + fn from(sample: crate::meta::sample_buffer::Sample) -> Self { + let mut fields = Vec::new(); + for (key, value) in sample.0 { + if let crate::meta::sample_buffer::SampleValue::String(s) = value { + fields.push((key.to_string(), s.to_string())); + } + } + Self { fields } + } +} + +#[cfg(not(fbcode_build))] +impl TelemetrySample { + pub fn new() -> Self { + Self { fields: Vec::new() } + } +} + +pub trait TelemetryTestHandle { + fn get_tracing_samples(&self) -> Vec; +} + +#[cfg(fbcode_build)] +struct MockScubaHandle { + tracing_client: crate::meta::scuba_utils::MockScubaClient, +} + +#[cfg(fbcode_build)] +impl TelemetryTestHandle for MockScubaHandle { + fn get_tracing_samples(&self) -> Vec { + self.tracing_client + .get_samples() + .into_iter() + .map(TelemetrySample::from) + .collect() + } +} + +struct EmptyTestHandle; + +impl TelemetryTestHandle for EmptyTestHandle { + fn get_tracing_samples(&self) -> Vec { + vec![] + } +} + pub trait TelemetryClock { fn now(&self) -> tokio::time::Instant; fn system_time_now(&self) -> std::time::SystemTime; @@ -584,6 +648,21 @@ pub fn initialize_logging_with_log_prefix( clock: impl TelemetryClock + Send + 'static, prefix_env_var: Option, ) { + let _ = initialize_logging_with_log_prefix_impl(clock, prefix_env_var, false); +} + +pub fn initialize_logging_with_log_prefix_mock_scuba( + clock: impl TelemetryClock + Send + 'static, + prefix_env_var: Option, +) -> Box { + initialize_logging_with_log_prefix_impl(clock, prefix_env_var, true) +} + +fn initialize_logging_with_log_prefix_impl( + clock: impl TelemetryClock + Send + 'static, + prefix_env_var: Option, + mock_scuba: bool, +) -> Box { let use_unified = std::env::var(USE_UNIFIED_LAYER).unwrap_or_default() == "1"; swap_telemetry_clock(clock); @@ -601,6 +680,9 @@ pub fn initialize_logging_with_log_prefix( #[cfg(fbcode_build)] { use crate::env::Env; + + let mut mock_scuba_client: Option = None; + if use_unified { let mut sinks: Vec> = Vec::new(); sinks.push(Box::new(sinks::glog::GlogSink::new( @@ -625,12 +707,37 @@ pub fn initialize_logging_with_log_prefix( } } + { + if !is_layer_disabled(DISABLE_OTEL_TRACING) { + use crate::meta; + use crate::meta::get_tracing_targets; + use crate::meta::scuba_utils::LOG_ENTER_EXIT; + + if mock_scuba { + let tracing_client = meta::scuba_utils::MockScubaClient::new(); + + sinks.push(Box::new( + meta::scuba_sink::ScubaSink::with_client( + tracing_client.clone(), + match meta::tracing_resource().get(&LOG_ENTER_EXIT) { + Some(Value::Bool(enabled)) => enabled, + _ => false, + }, + ) + .with_target_filter(get_tracing_targets()), + )); + + mock_scuba_client = Some(tracing_client); + } else { + sinks.push(Box::new( + meta::scuba_sink::ScubaSink::new(meta::tracing_resource()) + .with_target_filter(get_tracing_targets()), + )); + } + } + } + if let Err(err) = Registry::default() - .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { - Some(otel::tracing_layer()) - } else { - None - }) .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { Some(recorder().layer()) } else { @@ -669,24 +776,34 @@ pub fn initialize_logging_with_log_prefix( .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about ); - if let Err(err) = Registry::default() + let registry = Registry::default() .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) { - // TODO: get_reloadable_sqlite_layer currently still returns None, - // and some additional work is required to make it work. Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer")) } else { None }) - .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { - Some(otel::tracing_layer()) - } else { - None - }) .with(file_layer) .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) { Some(recorder().layer()) } else { None + }); + + if mock_scuba { + let tracing_client = crate::meta::scuba_utils::MockScubaClient::new(); + + let scuba_layer = crate::meta::tracing_layer_with_client(tracing_client.clone()); + + if let Err(err) = registry.with(scuba_layer).try_init() { + tracing::debug!("logging already initialized for this process: {}", err); + } + + mock_scuba_client = Some(tracing_client); + } else if let Err(err) = registry + .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) { + Some(otel::tracing_layer()) + } else { + None }) .try_init() { @@ -711,6 +828,12 @@ pub fn initialize_logging_with_log_prefix( if !is_layer_disabled(DISABLE_OTEL_METRICS) { otel::init_metrics(); } + + if let Some(tracing_client) = mock_scuba_client { + Box::new(MockScubaHandle { tracing_client }) + } else { + Box::new(EmptyTestHandle) + } } #[cfg(not(fbcode_build))] { @@ -763,6 +886,8 @@ pub fn initialize_logging_with_log_prefix( tracing::debug!("logging already initialized for this process: {}", err); } } + + Box::new(EmptyTestHandle) } } diff --git a/hyperactor_telemetry/src/trace_dispatcher.rs b/hyperactor_telemetry/src/trace_dispatcher.rs index 5d4726a7c..2c41a1617 100644 --- a/hyperactor_telemetry/src/trace_dispatcher.rs +++ b/hyperactor_telemetry/src/trace_dispatcher.rs @@ -32,7 +32,6 @@ const QUEUE_CAPACITY: usize = 100_000; /// This is captured once on the application thread, then sent to the background /// worker for fan-out to multiple exporters. #[derive(Debug, Clone)] -#[allow(dead_code)] pub(crate) enum TraceEvent { /// A new span was created (on_new_span) NewSpan { @@ -71,7 +70,6 @@ pub(crate) enum TraceEvent { /// Simplified field value representation for trace events #[derive(Debug, Clone)] -#[allow(dead_code)] pub(crate) enum FieldValue { Bool(bool), I64(i64),