From d4c70e05f7944b5ce564036833f6be7e1bcbc73a Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 23 Apr 2026 09:39:09 -0400 Subject: [PATCH] add spans for perfetto Signed-off-by: Connor Tsui --- vortex-compressor/src/compressor.rs | 326 ++------------------------- vortex-compressor/src/estimate.rs | 6 +- vortex-compressor/src/lib.rs | 18 +- vortex-compressor/src/stats/cache.rs | 6 +- vortex-compressor/src/trace.rs | 206 +++++++++-------- 5 files changed, 149 insertions(+), 413 deletions(-) diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index aa18b61637b..b99d31c6d73 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -314,16 +314,15 @@ impl CascadingCompressor { // Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the // scheme name and cascade history before propagating. let error_ctx = trace::enabled_error_context(&compress_ctx); - let compressed = match winner.compress(self, &data, compress_ctx, exec_ctx) { - Ok(compressed) => compressed, - Err(err) => { + let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered(); + let compressed = winner + .compress(self, &data, compress_ctx, exec_ctx) + .inspect_err(|err| { // NB: this is the only way we can tell which scheme panicked / bailed on their // data, especially for third-party schemes where the error site may not carry any // compressor context. - trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), &err); - return Err(err); - } - }; + trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err); + })?; let after_nbytes = compressed.nbytes(); let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64); @@ -331,9 +330,7 @@ impl CascadingCompressor { // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!! let accepted = after_nbytes < before_nbytes || compressed.is::(); - trace::scheme_compress_result( - winner.id(), - before_nbytes, + trace::record_winner_compress_result( after_nbytes, winner_estimate.trace_ratio(), actual_ratio, @@ -373,21 +370,24 @@ impl CascadingCompressor { let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new(); // Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2. - for &scheme in schemes { - match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) { - CompressionEstimate::Verdict(EstimateVerdict::Skip) => {} - CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => { - return Ok(Some((scheme, WinnerEstimate::AlwaysUse))); - } - CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => { - let score = EstimateScore::FiniteCompression(ratio); + { + let _verdict_pass = trace::verdict_pass_span().entered(); + for &scheme in schemes { + match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) { + CompressionEstimate::Verdict(EstimateVerdict::Skip) => {} + CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => { + return Ok(Some((scheme, WinnerEstimate::AlwaysUse))); + } + CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => { + let score = EstimateScore::FiniteCompression(ratio); - if is_better_score(score, &best) { - best = Some((scheme, score)); + if is_better_score(score, &best) { + best = Some((scheme, score)); + } + } + CompressionEstimate::Deferred(deferred_estimate) => { + deferred.push((scheme, deferred_estimate)); } - } - CompressionEstimate::Deferred(deferred_estimate) => { - deferred.push((scheme, deferred_estimate)); } } } @@ -395,6 +395,7 @@ impl CascadingCompressor { // Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can // short-circuit with `Skip` when they cannot beat it. for (scheme, deferred_estimate) in deferred { + let _span = trace::scheme_eval_span(scheme.id()).entered(); let threshold: Option = best.map(|(_, score)| score); match deferred_estimate { DeferredEstimate::Sample => { @@ -560,18 +561,9 @@ impl CascadingCompressor { #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use std::sync::Arc; use std::sync::LazyLock; use parking_lot::Mutex; - use tracing::Event; - use tracing::Subscriber; - use tracing::field::Field; - use tracing::field::Visit; - use tracing_subscriber::Layer; - use tracing_subscriber::layer::Context; - use tracing_subscriber::prelude::*; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::VortexSessionExecute; @@ -595,7 +587,6 @@ mod tests { use crate::estimate::EstimateVerdict; use crate::estimate::WinnerEstimate; use crate::scheme::SchemeExt; - use crate::trace::TARGET_TRACE; static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().with::()); @@ -613,98 +604,6 @@ mod tests { matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int()) } - fn test_integer_array() -> ArrayRef { - PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array() - } - - #[derive(Debug, Clone, PartialEq, Eq)] - struct RecordedEvent { - target: String, - fields: BTreeMap, - } - - #[derive(Default)] - struct EventVisitor { - fields: BTreeMap, - } - - impl Visit for EventVisitor { - fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { - self.fields - .insert(field.name().to_owned(), format!("{value:?}")); - } - - fn record_i64(&mut self, field: &Field, value: i64) { - self.fields - .insert(field.name().to_owned(), value.to_string()); - } - - fn record_u64(&mut self, field: &Field, value: u64) { - self.fields - .insert(field.name().to_owned(), value.to_string()); - } - - fn record_bool(&mut self, field: &Field, value: bool) { - self.fields - .insert(field.name().to_owned(), value.to_string()); - } - - fn record_str(&mut self, field: &Field, value: &str) { - self.fields - .insert(field.name().to_owned(), value.to_owned()); - } - } - - struct RecordingLayer { - events: Arc>>, - } - - impl RecordingLayer { - fn new(events: Arc>>) -> Self { - Self { events } - } - } - - impl Layer for RecordingLayer - where - S: Subscriber, - { - fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { - let mut visitor = EventVisitor::default(); - event.record(&mut visitor); - self.events.lock().push(RecordedEvent { - target: event.metadata().target().to_owned(), - fields: visitor.fields, - }); - } - } - - fn record_events(f: impl FnOnce() -> T) -> (T, Vec) { - let events = Arc::new(Mutex::new(Vec::new())); - let subscriber = - tracing_subscriber::registry().with(RecordingLayer::new(Arc::clone(&events))); - let result = tracing::subscriber::with_default(subscriber, f); - let recorded = events.lock().clone(); - (result, recorded) - } - - fn find_event<'a>( - events: &'a [RecordedEvent], - target: &str, - message: &str, - ) -> &'a RecordedEvent { - events - .iter() - .find(|event| { - event.target == target - && event - .fields - .get("message") - .is_some_and(|value| value == message) - }) - .expect("expected event not found") - } - #[derive(Debug)] struct DirectRatioScheme; @@ -935,102 +834,6 @@ mod tests { } } - #[derive(Debug)] - struct NestedFailureParentScheme; - - impl Scheme for NestedFailureParentScheme { - fn scheme_name(&self) -> &'static str { - "test.nested_failure_parent" - } - - fn matches(&self, canonical: &Canonical) -> bool { - matches_integer_primitive(canonical) - } - - fn expected_compression_ratio( - &self, - _data: &ArrayAndStats, - _compress_ctx: CompressorContext, - _exec_ctx: &mut ExecutionCtx, - ) -> CompressionEstimate { - CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) - } - - fn compress( - &self, - compressor: &CascadingCompressor, - data: &ArrayAndStats, - compress_ctx: CompressorContext, - exec_ctx: &mut ExecutionCtx, - ) -> VortexResult { - compressor.compress_child(data.array(), &compress_ctx, self.id(), 1, exec_ctx) - } - } - - #[derive(Debug)] - struct NestedFailureLeafScheme; - - impl Scheme for NestedFailureLeafScheme { - fn scheme_name(&self) -> &'static str { - "test.nested_failure_leaf" - } - - fn matches(&self, canonical: &Canonical) -> bool { - matches_integer_primitive(canonical) - } - - fn expected_compression_ratio( - &self, - _data: &ArrayAndStats, - _compress_ctx: CompressorContext, - _exec_ctx: &mut ExecutionCtx, - ) -> CompressionEstimate { - CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) - } - - fn compress( - &self, - _compressor: &CascadingCompressor, - _data: &ArrayAndStats, - _compress_ctx: CompressorContext, - _exec_ctx: &mut ExecutionCtx, - ) -> VortexResult { - vortex_error::vortex_bail!("nested failure") - } - } - - #[derive(Debug)] - struct SamplingFailureScheme; - - impl Scheme for SamplingFailureScheme { - fn scheme_name(&self) -> &'static str { - "test.sampling_failure" - } - - fn matches(&self, canonical: &Canonical) -> bool { - matches_integer_primitive(canonical) - } - - fn expected_compression_ratio( - &self, - _data: &ArrayAndStats, - _compress_ctx: CompressorContext, - _exec_ctx: &mut ExecutionCtx, - ) -> CompressionEstimate { - CompressionEstimate::Deferred(DeferredEstimate::Sample) - } - - fn compress( - &self, - _compressor: &CascadingCompressor, - _data: &ArrayAndStats, - _compress_ctx: CompressorContext, - _exec_ctx: &mut ExecutionCtx, - ) -> VortexResult { - vortex_error::vortex_bail!("sample failure") - } - } - #[test] fn test_self_exclusion() { let c = compressor(); @@ -1494,85 +1297,4 @@ mod tests { assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite())); Ok(()) } - - #[test] - fn compress_failure_event_includes_cascade_path_and_depth() { - let compressor = - CascadingCompressor::new(vec![&NestedFailureParentScheme, &NestedFailureLeafScheme]); - let array = test_integer_array(); - - let (result, events) = record_events(|| { - let mut exec_ctx = SESSION.create_execution_ctx(); - compressor.compress(&array, &mut exec_ctx) - }); - - assert!(result.is_err()); - let event = find_event(&events, TARGET_TRACE, "scheme.compress_failed"); - assert_eq!( - event.fields.get("scheme").map(String::as_str), - Some("test.nested_failure_leaf") - ); - assert_eq!( - event.fields.get("cascade_path").map(String::as_str), - Some("test.nested_failure_parent[1]") - ); - assert_eq!( - event.fields.get("cascade_depth").map(String::as_str), - Some("1") - ); - } - - #[test] - fn sample_failure_event_includes_cascade_path_and_depth() { - let compressor = CascadingCompressor::new(vec![&SamplingFailureScheme]); - let array = test_integer_array(); - - let (result, events) = record_events(|| { - let mut exec_ctx = SESSION.create_execution_ctx(); - compressor.compress(&array, &mut exec_ctx) - }); - - assert!(result.is_err()); - let event = find_event(&events, TARGET_TRACE, "sample.compress_failed"); - assert_eq!( - event.fields.get("scheme").map(String::as_str), - Some("test.sampling_failure") - ); - assert_eq!( - event.fields.get("cascade_path").map(String::as_str), - Some("root") - ); - assert_eq!( - event.fields.get("cascade_depth").map(String::as_str), - Some("0") - ); - } - - #[test] - fn zero_byte_sample_result_omits_ratio_fields_and_selects_no_scheme() { - let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]); - let array = test_integer_array(); - - let (result, events) = record_events(|| { - let mut exec_ctx = SESSION.create_execution_ctx(); - compressor.compress(&array, &mut exec_ctx) - }); - - assert!(result.is_ok()); - - let sample_event = find_event(&events, TARGET_TRACE, "sample.result"); - assert_eq!( - sample_event.fields.get("sampled_after").map(String::as_str), - Some("0") - ); - assert!(!sample_event.fields.contains_key("sampled_ratio")); - - assert!(!events.iter().any(|event| { - event.target == TARGET_TRACE - && event - .fields - .get("message") - .is_some_and(|value| value == "scheme.compress_result") - })); - } } diff --git a/vortex-compressor/src/estimate.rs b/vortex-compressor/src/estimate.rs index 9fbf434352a..065937ffac9 100644 --- a/vortex-compressor/src/estimate.rs +++ b/vortex-compressor/src/estimate.rs @@ -228,9 +228,9 @@ pub(super) fn estimate_compression_ratio_with_sampling( let score = EstimateScore::from_sample_sizes(before, after); - // Single DEBUG event per sampled scheme. Downstream tooling can join this with the eventual - // `scheme.compress_result` on the same scheme to compute sample-vs-full divergence. - trace::sample_result(scheme.id(), before, after, score.finite_ratio()); + if matches!(score, EstimateScore::ZeroBytes) { + trace::zero_byte_sample_result(scheme.id(), before); + } Ok(score) } diff --git a/vortex-compressor/src/lib.rs b/vortex-compressor/src/lib.rs index 9d6c7d7321f..1ecc9e4d5b4 100644 --- a/vortex-compressor/src/lib.rs +++ b/vortex-compressor/src/lib.rs @@ -18,17 +18,17 @@ //! //! # Observability //! -//! The compressor emits a small set of `tracing` events on a single target so you can see what -//! it's doing without attaching a profiler. +//! The compressor emits a small set of `tracing` spans and events on a single target so you can +//! see what it's doing without attaching a profiler. //! -//! For example, set `RUST_LOG=vortex_compressor::encode=debug` to see one line per leaf compression -//! decision. The `vortex_compressor::encode` target carries the main decision events -//! (`scheme.compress_result`, `sample.result`, and both `*.compress_failed`) plus the coarse -//! top-level `compress` span and `cascade_exhausted` event. +//! For example, set `RUST_LOG=vortex_compressor::encode=debug` to see compression decision spans +//! and exceptional events. The `vortex_compressor::encode` target carries the top-level `compress` +//! span, per-scheme evaluation and winning-compression spans, the `cascade_exhausted` event, +//! `sample.result` events for zero-byte sample outputs, and both `*.compress_failed` events. //! -//! The primary event is `scheme.compress_result`, which carries `scheme`, `before_nbytes`, -//! `after_nbytes`, `estimated_ratio` (absent when the scheme returned `AlwaysUse` or sampled to 0 -//! bytes), `actual_ratio` (absent when the compressed output is 0 bytes), and `accepted`. +//! The winning-compression span carries `scheme_chosen`, `input_nbytes`, `compressed_nbytes`, +//! `estimated_ratio` (absent when the scheme returned `AlwaysUse` or sampled to 0 bytes), +//! `achieved_ratio` (absent when the compressed output is 0 bytes), and `accepted`. //! //! Failure events additionally carry `cascade_path` and `cascade_depth`, so nested compression //! errors can be tied back to the ancestor branch that triggered them. diff --git a/vortex-compressor/src/stats/cache.rs b/vortex-compressor/src/stats/cache.rs index 91f0bf711fb..6f7020191a1 100644 --- a/vortex-compressor/src/stats/cache.rs +++ b/vortex-compressor/src/stats/cache.rs @@ -21,6 +21,7 @@ use super::FloatStats; use super::GenerateStatsOptions; use super::IntegerStats; use super::StringStats; +use crate::trace; /// A single cache entry: a concrete [`TypeId`] paired with a type-erased value. type StatsEntry = (TypeId, Arc); @@ -58,7 +59,10 @@ impl StatsCache { .ok() .vortex_expect("we just checked the TypeID") } else { - let new_arc: Arc = Arc::new(f()); + let new_arc: Arc = { + let _span = trace::generate_stats_span(std::any::type_name::()).entered(); + Arc::new(f()) + }; guard.push((type_id, Arc::clone(&new_arc) as Arc)); new_arc } diff --git a/vortex-compressor/src/trace.rs b/vortex-compressor/src/trace.rs index d499f18359f..84027272b31 100644 --- a/vortex-compressor/src/trace.rs +++ b/vortex-compressor/src/trace.rs @@ -12,6 +12,9 @@ use crate::scheme::SchemeId; pub(super) const TARGET_TRACE: &str = "vortex_compressor::encode"; /// Builds the top-level compression span. +/// +/// `input_nbytes` is known up front; `compressed_nbytes` / `compression_ratio` are filled in by +/// [`record_compress_outcome`] once the cascade returns. #[inline] pub(super) fn compress_span( len: usize, @@ -21,20 +24,117 @@ pub(super) fn compress_span( tracing::debug_span!( target: TARGET_TRACE, "compress", - len, + array_len = len, dtype = %dtype, - before_nbytes, - after_nbytes = tracing::field::Empty, - ratio = tracing::field::Empty, + input_nbytes = before_nbytes, + compressed_nbytes = tracing::field::Empty, + compression_ratio = tracing::field::Empty, ) } +/// Builds a span covering on-demand materialization of a cached stats type. +/// +/// Child of whatever span is active when a stats accessor first fires. Typically that's +/// [`verdict_pass_span`]; entering this span disambiguates stats cost from the rest of Pass 1. +/// `kind` is usually `std::any::type_name::()` so the args identify which group was generated +/// (e.g. `IntegerStats`, `FloatStats`). +#[inline] +pub(super) fn generate_stats_span(kind: &'static str) -> tracing::Span { + tracing::debug_span!( + target: TARGET_TRACE, + "generate_stats", + stats_kind = kind, + ) +} + +/// Builds a span covering Pass 1 of scheme selection (the cheap-verdict pass). +/// +/// Stats batches merged across eligible schemes are materialized lazily by the first +/// `expected_compression_ratio` call that touches them. Grouping those calls under one span makes +/// the stats cost (and unexpectedly slow verdicts) visible independently of per-candidate sampling. +#[inline] +pub(super) fn verdict_pass_span() -> tracing::Span { + tracing::debug_span!( + target: TARGET_TRACE, + "verdict_pass", + ) +} + +/// Builds a span covering one deferred per-scheme evaluation (sample or callback). +/// +/// `scheme_candidate` is the scheme being evaluated, not necessarily chosen. +#[inline] +pub(super) fn scheme_eval_span(scheme: SchemeId) -> tracing::Span { + tracing::debug_span!( + target: TARGET_TRACE, + "scheme_eval", + scheme_candidate = %scheme, + ) +} + +/// Emits the sampling result event for zero-byte sample outputs. +#[inline] +pub(super) fn zero_byte_sample_result(scheme: SchemeId, sampled_before: u64) { + tracing::debug!( + target: TARGET_TRACE, + scheme = %scheme, + sampled_before, + sampled_after = 0_u64, + "sample.result", + ); +} + +/// Builds a span covering the winning scheme's full-array compression. +/// +/// `scheme_chosen` and `input_nbytes` are known up front. `compressed_nbytes`, +/// `estimated_ratio`, `achieved_ratio`, and `accepted` are filled in by +/// [`record_winner_compress_result`] once the encode completes. +#[inline] +pub(super) fn winner_compress_span(scheme: SchemeId, before_nbytes: u64) -> tracing::Span { + tracing::debug_span!( + target: TARGET_TRACE, + "winner_compress", + scheme_chosen = %scheme, + input_nbytes = before_nbytes, + compressed_nbytes = tracing::field::Empty, + estimated_ratio = tracing::field::Empty, + achieved_ratio = tracing::field::Empty, + accepted = tracing::field::Empty, + ) +} + +/// Records the outcome of a winning-scheme compression on the current `winner_compress` span. +#[inline] +pub(super) fn record_winner_compress_result( + compressed_nbytes: u64, + estimated_ratio: Option, + achieved_ratio: Option, + accepted: bool, +) { + let span = tracing::Span::current(); + span.record("compressed_nbytes", compressed_nbytes); + if let Some(r) = estimated_ratio { + span.record("estimated_ratio", r); + } + if let Some(r) = achieved_ratio { + span.record("achieved_ratio", r); + } + span.record("accepted", accepted); +} + /// Records the final output size and, when finite, the top-level compression ratio. #[inline] -pub(super) fn record_compress_outcome(span: &tracing::Span, before_nbytes: u64, after_nbytes: u64) { - span.record("after_nbytes", after_nbytes); - if after_nbytes != 0 { - span.record("ratio", before_nbytes as f64 / after_nbytes as f64); +pub(super) fn record_compress_outcome( + span: &tracing::Span, + input_nbytes: u64, + compressed_nbytes: u64, +) { + span.record("compressed_nbytes", compressed_nbytes); + if compressed_nbytes != 0 { + span.record( + "compression_ratio", + input_nbytes as f64 / compressed_nbytes as f64, + ); } } @@ -76,68 +176,6 @@ pub(super) fn scheme_compress_failed( } } -/// Emits the leaf compression result event. -#[inline] -#[allow( - clippy::cognitive_complexity, - reason = "tracing sometimes triggers this" -)] -pub(super) fn scheme_compress_result( - scheme: SchemeId, - before_nbytes: u64, - after_nbytes: u64, - estimated_ratio: Option, - actual_ratio: Option, - accepted: bool, -) { - match (estimated_ratio, actual_ratio) { - (Some(estimated_ratio), Some(actual_ratio)) => { - tracing::debug!( - target: TARGET_TRACE, - scheme = %scheme, - before_nbytes, - after_nbytes, - estimated_ratio, - actual_ratio, - accepted, - "scheme.compress_result", - ); - } - (Some(estimated_ratio), None) => { - tracing::debug!( - target: TARGET_TRACE, - scheme = %scheme, - before_nbytes, - after_nbytes, - estimated_ratio, - accepted, - "scheme.compress_result", - ); - } - (None, Some(actual_ratio)) => { - tracing::debug!( - target: TARGET_TRACE, - scheme = %scheme, - before_nbytes, - after_nbytes, - actual_ratio, - accepted, - "scheme.compress_result", - ); - } - (None, None) => { - tracing::debug!( - target: TARGET_TRACE, - scheme = %scheme, - before_nbytes, - after_nbytes, - accepted, - "scheme.compress_result", - ); - } - } -} - /// Emits a sampling-failure event. #[inline] pub(super) fn sample_compress_failed( @@ -156,31 +194,3 @@ pub(super) fn sample_compress_failed( ); } } - -/// Emits the sampling result event. -#[inline] -pub(super) fn sample_result( - scheme: SchemeId, - sampled_before: u64, - sampled_after: u64, - sampled_ratio: Option, -) { - if let Some(sampled_ratio) = sampled_ratio { - tracing::debug!( - target: TARGET_TRACE, - scheme = %scheme, - sampled_before, - sampled_after, - sampled_ratio, - "sample.result", - ); - } else { - tracing::debug!( - target: TARGET_TRACE, - scheme = %scheme, - sampled_before, - sampled_after, - "sample.result", - ); - } -}