Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion vortex-bench/src/public_bi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,12 @@ impl Dataset for PBIBenchmark {
}

fn v3_dataset_dims(&self) -> (&str, Option<&str>) {
("public-bi", Some(&self.name))
// Match the v2 → v3 migrate classifier, which emits PBI compression
// records as `dataset = <lowercased pbi name>, dataset_variant = NULL`.
// The case-folding is applied by `compression_time_record` /
// `compression_size_record`; this method just surfaces the raw PBI
// name as the dataset.
(&self.name, None)
}

async fn to_vortex_array(&self, _ctx: &mut ExecutionCtx) -> anyhow::Result<ArrayRef> {
Expand Down Expand Up @@ -573,3 +578,31 @@ impl Benchmark for PublicBiBenchmark {
glob::Pattern::new(&pattern_str).ok()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn pbi_v3_dataset_dims_uses_pbi_name_as_dataset_with_no_variant() {
// The v2 → v3 migrate classifier emits PBI compression records as
// `dataset = <lowercased pbi name>, dataset_variant = NULL` (it never
// carried a `public-bi` parent in v2 chart names). The live emitter
// must mirror that shape so live ingests merge with migrated history
// into a single per-PBI-dataset chart group instead of forking off a
// sibling group keyed on `public-bi/<name>`. Lowercasing happens in
// `compression_time_record`/`compression_size_record`, so this trait
// method just needs to surface the raw PBI name as the dataset.
let bench = PBIBenchmark {
name: "Arade".to_string(),
base_path: PathBuf::new(),
};
assert_eq!(bench.v3_dataset_dims(), ("Arade", None));

let bench = PBIBenchmark {
name: "CMSprovider".to_string(),
base_path: PathBuf::new(),
};
assert_eq!(bench.v3_dataset_dims(), ("CMSprovider", None));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ expression: render(&record)?
{
"kind": "compression_time",
"commit_sha": "<commit-sha>",
"dataset": "public-bi",
"dataset_variant": "cms-provider",
"dataset": "cmsprovider",
"format": "vortex-file-compressed",
"op": "encode",
"value_ns": 5000000,
Expand Down
129 changes: 117 additions & 12 deletions vortex-bench/src/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,43 @@ pub struct VectorSearchRunRecord {
pub env_triple: Option<String>,
}

/// Canonicalize a TPC scale factor string for v3 dim emission.
///
/// Bench-orchestrator passes raw strings like `"1.0"` and `"100.0"`, but the
/// v2 → v3 migrate path canonicalizes integer-valued scale factors to `"1"` and
/// `"100"` (because v2 chart names carried integer-looking values). Live
/// records must use the same canonical form so they merge with migrated
/// history into a single chart group instead of forking off a sibling group
/// keyed on `SF=1.0` vs `SF=1`.
///
/// Falls back to the trimmed input on parse failure or non-finite values, so a
/// scale factor we cannot interpret as a number passes through unchanged
/// rather than being silently rewritten.
fn canonical_tpc_scale_factor(scale_factor: &str) -> String {
let trimmed = scale_factor.trim();
match trimmed.parse::<f64>() {
Ok(value) if value.is_finite() => format!("{value}"),
_ => scale_factor.to_string(),
}
}

/// Map a [`BenchmarkDataset`] to the `(dataset, dataset_variant, scale_factor)`
/// triple emitted in `query_measurement` records.
///
/// Mirrors the `Per-suite dim values` table in
/// `benchmarks-website/planning/benchmark-mapping.md`.
pub fn benchmark_dataset_dims(d: &BenchmarkDataset) -> (String, Option<String>, Option<String>) {
match d {
BenchmarkDataset::TpcH { scale_factor } => {
("tpch".to_string(), None, Some(scale_factor.clone()))
}
BenchmarkDataset::TpcDS { scale_factor } => {
("tpcds".to_string(), None, Some(scale_factor.clone()))
}
BenchmarkDataset::TpcH { scale_factor } => (
"tpch".to_string(),
None,
Some(canonical_tpc_scale_factor(scale_factor)),
),
BenchmarkDataset::TpcDS { scale_factor } => (
"tpcds".to_string(),
None,
Some(canonical_tpc_scale_factor(scale_factor)),
),
// ClickBench: the migrate path leaves `dataset_variant` NULL because
// v2 record names did not encode flavor, so the live emitter does the
// same to keep historical and live records in one `clickbench` group.
Expand Down Expand Up @@ -270,6 +294,11 @@ pub fn query_measurement_record(
///
/// Caller passes `dataset` (the compress-bench dataset name) and the
/// `op`. `dataset_variant` is reserved and unused at alpha.
///
/// `dataset` is lowercased here to match the v2 → v3 migrate classifier,
/// which stores `dataset = series.to_lowercase()`. Callers like
/// `Dataset::v3_dataset_dims` may therefore return mixed-case names without
/// having to duplicate the case-folding rule.
pub fn compression_time_record(
timing: &CompressionTimingMeasurement,
dataset: &str,
Expand All @@ -279,7 +308,7 @@ pub fn compression_time_record(
) -> V3Record {
V3Record::CompressionTime(CompressionTimeRecord {
commit_sha: GIT_COMMIT_ID.clone(),
dataset: dataset.to_string(),
dataset: dataset.to_lowercase(),
dataset_variant: dataset_variant.map(str::to_string),
format: timing.format.name().to_string(),
op: compress_op_label(op).to_string(),
Expand All @@ -290,6 +319,9 @@ pub fn compression_time_record(
}

/// Build a `compression_size` record.
///
/// `dataset` is lowercased here for the same reason as
/// [`compression_time_record`].
pub fn compression_size_record(
dataset: &str,
dataset_variant: Option<&str>,
Expand All @@ -298,7 +330,7 @@ pub fn compression_size_record(
) -> V3Record {
V3Record::CompressionSize(CompressionSizeRecord {
commit_sha: GIT_COMMIT_ID.clone(),
dataset: dataset.to_string(),
dataset: dataset.to_lowercase(),
dataset_variant: dataset_variant.map(str::to_string),
format: format.name().to_string(),
value_bytes,
Expand Down Expand Up @@ -505,16 +537,21 @@ mod tests {
}

#[test]
fn snapshot_compression_time_public_bi_variant() -> anyhow::Result<()> {
fn snapshot_compression_time_public_bi() -> anyhow::Result<()> {
// PBI compression records flatten the `public-bi` parent into the
// dataset axis: a `PBIBenchmark` named `CMSprovider` emits
// `dataset = "cmsprovider", dataset_variant = NULL`. Mixed-case input
// here exercises both `PBIBenchmark::v3_dataset_dims` (no variant) and
// `compression_time_record` (lowercase-folded dataset).
let timing = CompressionTimingMeasurement {
name: "compress time/cms-provider".to_string(),
name: "compress time/CMSprovider".to_string(),
format: Format::OnDiskVortex,
time: Duration::from_nanos(5_000_000),
};
let record = compression_time_record(
&timing,
"public-bi",
Some("cms-provider"),
"CMSprovider",
None,
CompressOp::Compress,
vec![5_500_000, 5_000_000, 5_200_000],
);
Expand Down Expand Up @@ -598,6 +635,74 @@ mod tests {
}
}

#[test]
fn compression_records_lowercase_dataset_for_v2_history_match() {
// The v2 → v3 migrate classifier stores `dataset = series.to_lowercase()`
// for compress-bench records (see `benchmarks-website/migrate/src/classifier.rs`).
// Datasets whose `Dataset::name()` returns mixed case
// (`TPC-H l_comment chunked`, every PBI name like `Arade`/`CMSprovider`)
// would otherwise emit live records that do not merge with their
// migrated history. Lowercasing inside the v3 helpers keeps the trait
// API simple for non-v3 callers while still matching migrate's shape.
let timing = CompressionTimingMeasurement {
name: "compress time/TPC-H l_comment chunked".to_string(),
format: Format::OnDiskVortex,
time: Duration::from_nanos(1_000_000),
};
let record = compression_time_record(
&timing,
"TPC-H l_comment chunked",
None,
CompressOp::Compress,
vec![1_000_000],
);
let V3Record::CompressionTime(time) = &record else {
panic!("expected CompressionTime variant, got {record:?}");
};
assert_eq!(time.dataset, "tpc-h l_comment chunked");

let record = compression_size_record("CMSprovider", None, Format::OnDiskVortex, 42);
let V3Record::CompressionSize(size) = &record else {
panic!("expected CompressionSize variant, got {record:?}");
};
assert_eq!(size.dataset, "cmsprovider");
}

#[test]
fn tpc_scale_factors_are_canonicalized_for_query_dims() {
// Bench-orchestrator passes raw TPC scale factors like `"1.0"` and `"100.0"`,
// but the v2 → v3 migrate path canonicalizes integer-valued scale factors
// to `"1"` and `"100"` (because v2 chart names carried integer-looking
// values). The live emitter must do the same so live ingests merge with
// migrated history into a single chart group instead of forking off a
// sibling group keyed on `SF=1.0` vs `SF=1`.
let cases = [
("1.0", "1"),
("100.0", "100"),
("1", "1"),
("100", "100"),
("0.01", "0.01"),
];
for (input, expected) in cases {
let (_, _, sf) = benchmark_dataset_dims(&BenchmarkDataset::TpcH {
scale_factor: input.to_string(),
});
assert_eq!(
sf.as_deref(),
Some(expected),
"TpcH scale factor {input:?} should canonicalize to {expected:?}",
);
let (_, _, sf) = benchmark_dataset_dims(&BenchmarkDataset::TpcDS {
scale_factor: input.to_string(),
});
assert_eq!(
sf.as_deref(),
Some(expected),
"TpcDS scale factor {input:?} should canonicalize to {expected:?}",
);
}
}

#[test]
fn jsonl_round_trips_one_record_per_line() -> anyhow::Result<()> {
let record = compression_size_record("taxi", None, Format::Parquet, 100);
Expand Down
Loading