Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 24 additions & 302 deletions vortex-compressor/src/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,26 +314,23 @@ impl CascadingCompressor {
// Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the
// scheme name and cascade history before propagating.
let error_ctx = trace::enabled_error_context(&compress_ctx);
let compressed = match winner.compress(self, &data, compress_ctx, exec_ctx) {
Ok(compressed) => compressed,
Err(err) => {
let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
let compressed = winner
.compress(self, &data, compress_ctx, exec_ctx)
.inspect_err(|err| {
// NB: this is the only way we can tell which scheme panicked / bailed on their
// data, especially for third-party schemes where the error site may not carry any
// compressor context.
trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), &err);
return Err(err);
}
};
trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
})?;

let after_nbytes = compressed.nbytes();
let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);

// TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();

trace::scheme_compress_result(
winner.id(),
before_nbytes,
trace::record_winner_compress_result(
after_nbytes,
winner_estimate.trace_ratio(),
actual_ratio,
Expand Down Expand Up @@ -373,28 +370,32 @@ impl CascadingCompressor {
let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();

// Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2.
for &scheme in schemes {
match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
}
CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
let score = EstimateScore::FiniteCompression(ratio);
{
let _verdict_pass = trace::verdict_pass_span().entered();
for &scheme in schemes {
match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
}
CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
let score = EstimateScore::FiniteCompression(ratio);

if is_better_score(score, &best) {
best = Some((scheme, score));
if is_better_score(score, &best) {
best = Some((scheme, score));
}
}
CompressionEstimate::Deferred(deferred_estimate) => {
deferred.push((scheme, deferred_estimate));
}
}
CompressionEstimate::Deferred(deferred_estimate) => {
deferred.push((scheme, deferred_estimate));
}
}
}

// Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can
// short-circuit with `Skip` when they cannot beat it.
for (scheme, deferred_estimate) in deferred {
let _span = trace::scheme_eval_span(scheme.id()).entered();
let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
match deferred_estimate {
DeferredEstimate::Sample => {
Expand Down Expand Up @@ -560,18 +561,9 @@ impl CascadingCompressor {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::LazyLock;

use parking_lot::Mutex;
use tracing::Event;
use tracing::Subscriber;
use tracing::field::Field;
use tracing::field::Visit;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::prelude::*;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::VortexSessionExecute;
Expand All @@ -595,7 +587,6 @@ mod tests {
use crate::estimate::EstimateVerdict;
use crate::estimate::WinnerEstimate;
use crate::scheme::SchemeExt;
use crate::trace::TARGET_TRACE;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
Expand All @@ -613,98 +604,6 @@ mod tests {
matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
}

fn test_integer_array() -> ArrayRef {
PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array()
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct RecordedEvent {
target: String,
fields: BTreeMap<String, String>,
}

#[derive(Default)]
struct EventVisitor {
fields: BTreeMap<String, String>,
}

impl Visit for EventVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name().to_owned(), format!("{value:?}"));
}

fn record_i64(&mut self, field: &Field, value: i64) {
self.fields
.insert(field.name().to_owned(), value.to_string());
}

fn record_u64(&mut self, field: &Field, value: u64) {
self.fields
.insert(field.name().to_owned(), value.to_string());
}

fn record_bool(&mut self, field: &Field, value: bool) {
self.fields
.insert(field.name().to_owned(), value.to_string());
}

fn record_str(&mut self, field: &Field, value: &str) {
self.fields
.insert(field.name().to_owned(), value.to_owned());
}
}

struct RecordingLayer {
events: Arc<Mutex<Vec<RecordedEvent>>>,
}

impl RecordingLayer {
fn new(events: Arc<Mutex<Vec<RecordedEvent>>>) -> Self {
Self { events }
}
}

impl<S> Layer<S> for RecordingLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = EventVisitor::default();
event.record(&mut visitor);
self.events.lock().push(RecordedEvent {
target: event.metadata().target().to_owned(),
fields: visitor.fields,
});
}
}

fn record_events<T>(f: impl FnOnce() -> T) -> (T, Vec<RecordedEvent>) {
let events = Arc::new(Mutex::new(Vec::new()));
let subscriber =
tracing_subscriber::registry().with(RecordingLayer::new(Arc::clone(&events)));
let result = tracing::subscriber::with_default(subscriber, f);
let recorded = events.lock().clone();
(result, recorded)
}

fn find_event<'a>(
events: &'a [RecordedEvent],
target: &str,
message: &str,
) -> &'a RecordedEvent {
events
.iter()
.find(|event| {
event.target == target
&& event
.fields
.get("message")
.is_some_and(|value| value == message)
})
.expect("expected event not found")
}

#[derive(Debug)]
struct DirectRatioScheme;

Expand Down Expand Up @@ -935,102 +834,6 @@ mod tests {
}
}

#[derive(Debug)]
struct NestedFailureParentScheme;

impl Scheme for NestedFailureParentScheme {
fn scheme_name(&self) -> &'static str {
"test.nested_failure_parent"
}

fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
}

fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
compressor.compress_child(data.array(), &compress_ctx, self.id(), 1, exec_ctx)
}
}

#[derive(Debug)]
struct NestedFailureLeafScheme;

impl Scheme for NestedFailureLeafScheme {
fn scheme_name(&self) -> &'static str {
"test.nested_failure_leaf"
}

fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
}

fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
vortex_error::vortex_bail!("nested failure")
}
}

#[derive(Debug)]
struct SamplingFailureScheme;

impl Scheme for SamplingFailureScheme {
fn scheme_name(&self) -> &'static str {
"test.sampling_failure"
}

fn matches(&self, canonical: &Canonical) -> bool {
matches_integer_primitive(canonical)
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
&self,
_compressor: &CascadingCompressor,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
vortex_error::vortex_bail!("sample failure")
}
}

#[test]
fn test_self_exclusion() {
let c = compressor();
Expand Down Expand Up @@ -1494,85 +1297,4 @@ mod tests {
assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
Ok(())
}

#[test]
fn compress_failure_event_includes_cascade_path_and_depth() {
let compressor =
CascadingCompressor::new(vec![&NestedFailureParentScheme, &NestedFailureLeafScheme]);
let array = test_integer_array();

let (result, events) = record_events(|| {
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.compress(&array, &mut exec_ctx)
});

assert!(result.is_err());
let event = find_event(&events, TARGET_TRACE, "scheme.compress_failed");
assert_eq!(
event.fields.get("scheme").map(String::as_str),
Some("test.nested_failure_leaf")
);
assert_eq!(
event.fields.get("cascade_path").map(String::as_str),
Some("test.nested_failure_parent[1]")
);
assert_eq!(
event.fields.get("cascade_depth").map(String::as_str),
Some("1")
);
}

#[test]
fn sample_failure_event_includes_cascade_path_and_depth() {
let compressor = CascadingCompressor::new(vec![&SamplingFailureScheme]);
let array = test_integer_array();

let (result, events) = record_events(|| {
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.compress(&array, &mut exec_ctx)
});

assert!(result.is_err());
let event = find_event(&events, TARGET_TRACE, "sample.compress_failed");
assert_eq!(
event.fields.get("scheme").map(String::as_str),
Some("test.sampling_failure")
);
assert_eq!(
event.fields.get("cascade_path").map(String::as_str),
Some("root")
);
assert_eq!(
event.fields.get("cascade_depth").map(String::as_str),
Some("0")
);
}

#[test]
fn zero_byte_sample_result_omits_ratio_fields_and_selects_no_scheme() {
let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
let array = test_integer_array();

let (result, events) = record_events(|| {
let mut exec_ctx = SESSION.create_execution_ctx();
compressor.compress(&array, &mut exec_ctx)
});

assert!(result.is_ok());

let sample_event = find_event(&events, TARGET_TRACE, "sample.result");
assert_eq!(
sample_event.fields.get("sampled_after").map(String::as_str),
Some("0")
);
assert!(!sample_event.fields.contains_key("sampled_ratio"));

assert!(!events.iter().any(|event| {
event.target == TARGET_TRACE
&& event
.fields
.get("message")
.is_some_and(|value| value == "scheme.compress_result")
}));
}
}
Loading
Loading