From 35d59ae2c2f0396c2f1d43af8ad30eca35519dc5 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 15:59:18 +0000 Subject: [PATCH 1/3] feat(bench): emit v3 JSONL records and dual-write to bench server Brings the v3 emitter and CI dual-write plumbing from ct/benchmarks-v3 onto develop without the v3 server/website code. CI continues to write v2 results to S3 unchanged; v3 ingest is gated on vars.V3_INGEST_URL and `continue-on-error: true`, so when the variable is unset (or the server is unreachable) the workflow no-ops. vortex-bench: - New `vortex-bench/src/v3.rs` with one record per `kind` (`query_measurement`, `compression_time`, `compression_size`, `random_access_time`, `vector_search_run`) plus a serde-tagged `V3Record` enum, JSONL writer, and snapshot tests. - `Dataset::v3_dataset_dims()` (default `(name(), None)`) lets Public-BI map to `(public-bi, )`. - `compress`/`runner` capture per-iteration timings and provide `SqlBenchmarkRunner::v3_records()`. Benchmark binaries (`compress-bench`, `datafusion-bench`, `duckdb-bench`, `lance-bench`, `random-access-bench`, `vector-search-bench`) gain `--gh-json-v3 ` for JSONL emission alongside the existing `gh-json` flow. bench-orchestrator passes `--gh-json-v3` through `vx-bench run`. `scripts/post-ingest.py` reads JSONL, fills the `commit` envelope from `git show`, wraps in `{run_meta, commit, records}`, and POSTs to `/api/ingest`. Stdlib only. Workflows: - `.github/workflows/bench.yml` and `sql-benchmarks.yml` add `--gh-json-v3 results.v3.jsonl` and a follow-up "Ingest results to v3 server" step. - New `.github/workflows/v3-commit-metadata.yml` POSTs an empty envelope on every push to `develop` so the v3 `commits` dim stays populated. Files intentionally NOT brought over: anything under `benchmarks-website/`, the workspace member additions for the v3 server, and workflows depending on the v3 server crate. The v3 website ships in a follow-up PR off `ct/benchmarks-v3` once dual-write is healthy in production. Signed-off-by: Claude --- .github/workflows/bench.yml | 15 +- .github/workflows/sql-benchmarks.yml | 15 + .github/workflows/v3-commit-metadata.yml | 35 ++ Cargo.lock | 1 + bench-orchestrator/bench_orchestrator/cli.py | 5 + .../bench_orchestrator/runner/executor.py | 5 + bench-orchestrator/tests/test_executor.py | 25 + benchmarks/compress-bench/src/main.rs | 54 +- benchmarks/datafusion-bench/src/main.rs | 10 + benchmarks/duckdb-bench/src/main.rs | 10 + benchmarks/lance-bench/src/main.rs | 10 + benchmarks/random-access-bench/src/main.rs | 14 + benchmarks/vector-search-bench/src/main.rs | 36 ++ scripts/post-ingest.py | 195 ++++++ vortex-bench/Cargo.toml | 3 + vortex-bench/REUSE.toml | 6 + vortex-bench/src/compress/mod.rs | 10 + vortex-bench/src/datasets/mod.rs | 11 + vortex-bench/src/lib.rs | 1 + vortex-bench/src/public_bi.rs | 4 + vortex-bench/src/runner.rs | 18 + ..._v3__tests__snapshot_compression_size.snap | 11 + ...sts__snapshot_compression_time_encode.snap | 18 + ...ot_compression_time_public_bi_variant.snap | 19 + ...ement_clickbench_no_memory@clickbench.snap | 19 + ...y_measurement_with_memory@with_memory.snap | 25 + ...3__tests__snapshot_random_access_time.snap | 17 + ...v3__tests__snapshot_vector_search_run.snap | 23 + vortex-bench/src/v3.rs | 574 ++++++++++++++++++ 29 files changed, 1184 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/v3-commit-metadata.yml create mode 100755 scripts/post-ingest.py create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_size.snap create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_encode.snap create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_public_bi_variant.snap create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_clickbench_no_memory@clickbench.snap create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_with_memory@with_memory.snap create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_random_access_time.snap create mode 100644 vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_vector_search_run.snap create mode 100644 vortex-bench/src/v3.rs diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index a437523b9be..37371dadd05 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -92,7 +92,7 @@ jobs: VORTEX_EXPERIMENTAL_PATCHED_ARRAY: "1" FLAT_LAYOUT_INLINE_ARRAY_NODE: "1" run: | - bash scripts/bench-taskset.sh target/release_debug/${{ matrix.benchmark.id }} --formats ${{ matrix.benchmark.formats }} -d gh-json -o results.json + bash scripts/bench-taskset.sh target/release_debug/${{ matrix.benchmark.id }} --formats ${{ matrix.benchmark.formats }} -d gh-json -o results.json --gh-json-v3 results.v3.jsonl - name: Setup AWS CLI uses: aws-actions/configure-aws-credentials@ec61189d14ec14c8efccab744f656cffd0e33f37 # v6 @@ -105,6 +105,19 @@ jobs: run: | bash scripts/cat-s3.sh vortex-ci-benchmark-results data.json.gz results.json + - name: Ingest results to v3 server + if: vars.V3_INGEST_URL != '' + continue-on-error: true + shell: bash + env: + INGEST_BEARER_TOKEN: ${{ secrets.INGEST_BEARER_TOKEN }} + run: | + python3 scripts/post-ingest.py results.v3.jsonl \ + --server "${{ vars.V3_INGEST_URL }}" \ + --commit-sha "${{ github.sha }}" \ + --benchmark-id "${{ matrix.benchmark.id }}" \ + --repo-url "${{ github.server_url }}/${{ github.repository }}" + - name: Alert incident.io if: failure() uses: ./.github/actions/alert-incident-io diff --git a/.github/workflows/sql-benchmarks.yml b/.github/workflows/sql-benchmarks.yml index 8be259fa562..fe77675fcbd 100644 --- a/.github/workflows/sql-benchmarks.yml +++ b/.github/workflows/sql-benchmarks.yml @@ -376,6 +376,7 @@ jobs: bash scripts/bench-taskset.sh uv run --project bench-orchestrator vx-bench run "${{ matrix.subcommand }}" \ --targets-json '${{ steps.targets.outputs.targets_json }}' \ --output results.json \ + --gh-json-v3 results.v3.jsonl \ --no-build \ --runner "ec2_${{ inputs.machine_type }}" \ ${{ matrix.iterations && format('--iterations {0}', matrix.iterations) || '' }} \ @@ -395,6 +396,7 @@ jobs: bash scripts/bench-taskset.sh uv run --project bench-orchestrator vx-bench run "${{ matrix.subcommand }}" \ --targets-json '${{ steps.targets.outputs.targets_json }}' \ --output results.json \ + --gh-json-v3 results.v3.jsonl \ --no-build \ --runner "ec2_${{ inputs.machine_type }}" \ ${{ matrix.iterations && format('--iterations {0}', matrix.iterations) || '' }} \ @@ -499,6 +501,19 @@ jobs: run: | bash scripts/cat-s3.sh vortex-ci-benchmark-results data.json.gz results.json + - name: Ingest results to v3 server + if: inputs.mode == 'develop' && vars.V3_INGEST_URL != '' + continue-on-error: true + shell: bash + env: + INGEST_BEARER_TOKEN: ${{ secrets.INGEST_BEARER_TOKEN }} + run: | + python3 scripts/post-ingest.py results.v3.jsonl \ + --server "${{ vars.V3_INGEST_URL }}" \ + --commit-sha "${{ github.sha }}" \ + --benchmark-id "${{ matrix.id }}" \ + --repo-url "${{ github.server_url }}/${{ github.repository }}" + - name: Upload File Sizes if: inputs.mode == 'develop' && matrix.remote_storage == null shell: bash diff --git a/.github/workflows/v3-commit-metadata.yml b/.github/workflows/v3-commit-metadata.yml new file mode 100644 index 00000000000..2ddee9dc84d --- /dev/null +++ b/.github/workflows/v3-commit-metadata.yml @@ -0,0 +1,35 @@ +# Posts a v3 ingest envelope with no records on every push to develop, so the +# `commits` dim stays populated even when no benchmark ran. + +name: v3 commit metadata + +on: + push: + branches: [develop] + workflow_dispatch: { } + +permissions: + contents: read + +jobs: + commit-metadata: + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 2 + + - name: Ingest commit metadata to v3 server + if: vars.V3_INGEST_URL != '' + continue-on-error: true + shell: bash + env: + INGEST_BEARER_TOKEN: ${{ secrets.INGEST_BEARER_TOKEN }} + run: | + echo -n > empty.jsonl + python3 scripts/post-ingest.py empty.jsonl \ + --server "${{ vars.V3_INGEST_URL }}" \ + --commit-sha "${{ github.sha }}" \ + --benchmark-id "commit-metadata" \ + --repo-url "${{ github.server_url }}/${{ github.repository }}" diff --git a/Cargo.lock b/Cargo.lock index 0f7a33f3d14..c624e3ae1e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10156,6 +10156,7 @@ dependencies = [ "glob", "humansize", "indicatif", + "insta", "itertools 0.14.0", "mimalloc", "noodles-bgzf", diff --git a/bench-orchestrator/bench_orchestrator/cli.py b/bench-orchestrator/bench_orchestrator/cli.py index d497d85ed13..6c200015182 100644 --- a/bench-orchestrator/bench_orchestrator/cli.py +++ b/bench-orchestrator/bench_orchestrator/cli.py @@ -210,6 +210,10 @@ def run( Path | None, typer.Option("--output", help="Optional path for compatibility JSONL output"), ] = None, + gh_json_v3: Annotated[ + Path | None, + typer.Option("--gh-json-v3", help="Optional path for v3 JSONL records emitted by the benchmark binary"), + ] = None, options: Annotated[list[str] | None, typer.Option("--opt", help="Engine or benchmark specific options")] = None, ) -> None: """Run benchmarks with specified configuration.""" @@ -294,6 +298,7 @@ def run( sample_rate=sample_rate, tracing=tracing, runner=runner, + gh_json_v3=gh_json_v3, on_result=lambda line, store_writer=ctx.write_raw_json, compatibility=compatibility_file: ( write_result_line( line, diff --git a/bench-orchestrator/bench_orchestrator/runner/executor.py b/bench-orchestrator/bench_orchestrator/runner/executor.py index b895afdc2e1..32ed9c91132 100644 --- a/bench-orchestrator/bench_orchestrator/runner/executor.py +++ b/bench-orchestrator/bench_orchestrator/runner/executor.py @@ -40,6 +40,7 @@ def build_command( sample_rate: int | None = None, tracing: bool = False, runner: str | None = None, + gh_json_v3: Path | None = None, ) -> list[str]: """Build the command used to execute a benchmark binary.""" cmd = [ @@ -67,6 +68,8 @@ def build_command( cmd.append("--tracing") if runner: cmd.extend(["--runner", runner]) + if gh_json_v3 is not None: + cmd.extend(["--gh-json-v3", str(gh_json_v3)]) if options: for key, value in options.items(): cmd.extend(["--opt", f"{key}={value}"]) @@ -98,6 +101,7 @@ def run( sample_rate: int | None = None, tracing: bool = False, runner: str | None = None, + gh_json_v3: Path | None = None, on_result: Callable[[str], None] | None = None, ) -> list[str]: """ @@ -128,6 +132,7 @@ def run( sample_rate=sample_rate, tracing=tracing, runner=runner, + gh_json_v3=gh_json_v3, ) if self.verbose: diff --git a/bench-orchestrator/tests/test_executor.py b/bench-orchestrator/tests/test_executor.py index ade3dde1a67..dd3253a22ff 100644 --- a/bench-orchestrator/tests/test_executor.py +++ b/bench-orchestrator/tests/test_executor.py @@ -48,6 +48,31 @@ def test_build_command_omits_formats_for_lance_backend() -> None: assert "1,3" in cmd +def test_build_command_includes_gh_json_v3_when_set() -> None: + executor = BenchmarkExecutor(Path("/tmp/duckdb-bench"), Engine.DUCKDB) + + cmd = executor.build_command( + benchmark=Benchmark.TPCH, + formats=[Format.PARQUET], + gh_json_v3=Path("results.v3.jsonl"), + ) + + assert "--gh-json-v3" in cmd + flag_idx = cmd.index("--gh-json-v3") + assert cmd[flag_idx + 1] == "results.v3.jsonl" + + +def test_build_command_omits_gh_json_v3_when_unset() -> None: + executor = BenchmarkExecutor(Path("/tmp/duckdb-bench"), Engine.DUCKDB) + + cmd = executor.build_command( + benchmark=Benchmark.TPCH, + formats=[Format.PARQUET], + ) + + assert "--gh-json-v3" not in cmd + + def test_run_streams_logs_without_counting_them(tmp_path: Path) -> None: script = tmp_path / "fake-bench.py" script.write_text( diff --git a/benchmarks/compress-bench/src/main.rs b/benchmarks/compress-bench/src/main.rs index 4800d3d6772..ce48001abb5 100644 --- a/benchmarks/compress-bench/src/main.rs +++ b/benchmarks/compress-bench/src/main.rs @@ -41,6 +41,7 @@ use vortex_bench::public_bi::PBIDataset::Euro2016; use vortex_bench::public_bi::PBIDataset::Food; use vortex_bench::public_bi::PBIDataset::HashTags; use vortex_bench::setup_logging_and_tracing_with_format; +use vortex_bench::v3; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -68,6 +69,10 @@ struct Args { display_format: DisplayFormat, #[arg(short, long)] output_path: Option, + /// Additionally write v3 JSONL records to this path. See + /// `benchmarks-website/planning/02-contracts.md`. + #[arg(long)] + gh_json_v3: Option, #[arg(long)] tracing: bool, /// Format for the primary stderr log sink. `text` is the default human-readable format; @@ -89,6 +94,7 @@ async fn main() -> anyhow::Result<()> { args.ops, args.display_format, args.output_path, + args.gh_json_v3, ) .await } @@ -114,6 +120,7 @@ async fn run_compress( ops: Vec, display_format: DisplayFormat, output_path: Option, + gh_json_v3: Option, ) -> anyhow::Result<()> { let targets = formats .iter() @@ -163,17 +170,24 @@ async fn run_compress( let progress = ProgressBar::new((datasets.len() * formats.len() * ops.len()) as u64); let mut measurements = vec![]; + let mut v3_records: Vec = Vec::new(); for dataset_handle in datasets.into_iter() { - let m = run_benchmark_for_dataset(&progress, &formats, &ops, iterations, dataset_handle) - .await?; + let (m, mut records) = + run_benchmark_for_dataset(&progress, &formats, &ops, iterations, dataset_handle) + .await?; measurements.push(m); + v3_records.append(&mut records); } let measurements = CompressMeasurements::from_iter(measurements); progress.finish(); + if let Some(path) = gh_json_v3 { + v3::write_jsonl_to_path(&path, &v3_records)?; + } + let mut writer = create_output_writer(&display_format, output_path, BENCHMARK_ID)?; match display_format { @@ -202,8 +216,9 @@ async fn run_benchmark_for_dataset( ops: &[CompressOp], iterations: usize, dataset_handle: &dyn Dataset, -) -> anyhow::Result { +) -> anyhow::Result<(CompressMeasurements, Vec)> { let bench_name = dataset_handle.name(); + let (v3_dataset, v3_variant) = dataset_handle.v3_dataset_dims(); tracing::info!("Running {bench_name} benchmark"); // Get the parquet file path for this dataset @@ -213,6 +228,7 @@ async fn run_benchmark_for_dataset( let mut timings = Vec::new(); let mut measurements_map: HashMap<(Format, CompressOp), Duration> = HashMap::new(); let mut compressed_sizes: HashMap = HashMap::new(); + let mut v3_records: Vec = Vec::new(); for format in formats { let compressor = get_compressor(*format); @@ -228,6 +244,24 @@ async fn run_benchmark_for_dataset( ) .await?; compressed_sizes.insert(*format, result.compressed_size); + let all_runs_ns: Vec = result + .all_runs + .iter() + .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX)) + .collect(); + v3_records.push(v3::compression_time_record( + &result.timing, + v3_dataset, + v3_variant, + CompressOp::Compress, + all_runs_ns, + )); + v3_records.push(v3::compression_size_record( + v3_dataset, + v3_variant, + *format, + result.compressed_size, + )); ratios.extend(result.ratios); timings.push(result.timing); result.time @@ -240,6 +274,18 @@ async fn run_benchmark_for_dataset( bench_name, ) .await?; + let all_runs_ns: Vec = result + .all_runs + .iter() + .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX)) + .collect(); + v3_records.push(v3::compression_time_record( + &result.timing, + v3_dataset, + v3_variant, + CompressOp::Decompress, + all_runs_ns, + )); timings.push(result.timing); result.time } @@ -258,5 +304,5 @@ async fn run_benchmark_for_dataset( &mut ratios, ); - Ok(CompressMeasurements { timings, ratios }) + Ok((CompressMeasurements { timings, ratios }, v3_records)) } diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 745d8371303..3f264e758f7 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -44,6 +44,7 @@ use vortex_bench::runner::BenchmarkQueryResult; use vortex_bench::runner::SqlBenchmarkRunner; use vortex_bench::runner::filter_queries; use vortex_bench::setup_logging_and_tracing; +use vortex_bench::v3; use vortex_datafusion::metrics::VortexMetricsFinder; /// Common arguments shared across benchmarks @@ -82,6 +83,11 @@ struct Args { #[arg(short)] output_path: Option, + /// Additionally write v3 JSONL records to this path. See + /// `benchmarks-website/planning/02-contracts.md`. + #[arg(long)] + gh_json_v3: Option, + #[arg(long, default_value_t = false)] show_metrics: bool, @@ -226,6 +232,10 @@ async fn main() -> anyhow::Result<()> { print_metrics(plans.as_ref()); } + if let Some(path) = args.gh_json_v3.as_ref() { + v3::write_jsonl_to_path(path, &runner.v3_records())?; + } + let benchmark_id = format!("datafusion-{}", benchmark.dataset_name()); let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?; runner.export_to(&args.display_format, writer)?; diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index d8a3306b224..b2a4ff1bd15 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -24,6 +24,7 @@ use vortex_bench::runner::BenchmarkMode; use vortex_bench::runner::SqlBenchmarkRunner; use vortex_bench::runner::filter_queries; use vortex_bench::setup_logging_and_tracing; +use vortex_bench::v3; /// Common arguments shared across benchmarks #[derive(Parser)] @@ -58,6 +59,11 @@ struct Args { #[arg(short)] output_path: Option, + /// Additionally write v3 JSONL records to this path. See + /// `benchmarks-website/planning/02-contracts.md`. + #[arg(long)] + gh_json_v3: Option, + #[arg(long, default_value_t = false)] track_memory: bool, @@ -190,6 +196,10 @@ fn main() -> anyhow::Result<()> { )?; if !args.explain { + if let Some(path) = args.gh_json_v3.as_ref() { + v3::write_jsonl_to_path(path, &runner.v3_records())?; + } + let benchmark_id = format!("duckdb-{}", benchmark.dataset_name()); let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?; runner.export_to(&args.display_format, writer)?; diff --git a/benchmarks/lance-bench/src/main.rs b/benchmarks/lance-bench/src/main.rs index 6cce97d2548..ca65f0c74c7 100644 --- a/benchmarks/lance-bench/src/main.rs +++ b/benchmarks/lance-bench/src/main.rs @@ -28,6 +28,7 @@ use vortex_bench::runner::BenchmarkQueryResult; use vortex_bench::runner::SqlBenchmarkRunner; use vortex_bench::runner::filter_queries; use vortex_bench::setup_logging_and_tracing; +use vortex_bench::v3; /// Lance benchmark tool - runs SQL queries against Lance format data using DataFusion #[derive(Parser)] @@ -59,6 +60,11 @@ struct Args { #[arg(short)] output_path: Option, + /// Additionally write v3 JSONL records to this path. See + /// `benchmarks-website/planning/02-contracts.md`. + #[arg(long)] + gh_json_v3: Option, + #[arg(long, default_value_t = false)] hide_progress_bar: bool, @@ -124,6 +130,10 @@ async fn main() -> anyhow::Result<()> { ) .await?; + if let Some(path) = args.gh_json_v3.as_ref() { + v3::write_jsonl_to_path(path, &runner.v3_records())?; + } + let benchmark_id = format!("lance-{}", benchmark.dataset_name()); let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?; runner.export_to(&args.display_format, writer)?; diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 852332528a8..b3c85211f6f 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -32,6 +32,7 @@ use vortex_bench::random_access::RandomAccessor; use vortex_bench::random_access::VortexRandomAccessor; use vortex_bench::setup_logging_and_tracing; use vortex_bench::utils::constants::STORAGE_NVME; +use vortex_bench::v3; // --------------------------------------------------------------------------- // Access patterns @@ -173,6 +174,10 @@ struct Args { display_format: DisplayFormat, #[arg(short)] output_path: Option, + /// Additionally write v3 JSONL records to this path. See + /// `benchmarks-website/planning/02-contracts.md`. + #[arg(long)] + gh_json_v3: Option, /// Which datasets to benchmark random access on. #[arg( long, @@ -205,6 +210,7 @@ async fn main() -> Result<()> { args.open_mode, args.display_format, args.output_path, + args.gh_json_v3, ) .await } @@ -340,6 +346,7 @@ async fn run_random_access( open_mode: OpenMode, display_format: DisplayFormat, output_path: Option, + gh_json_v3: Option, ) -> Result<()> { let reopen_variants: &[bool] = match open_mode { OpenMode::Cached => &[false], @@ -358,6 +365,7 @@ async fn run_random_access( let mut targets = Vec::new(); let mut measurements = Vec::new(); + let mut v3_records: Vec = Vec::new(); for dataset in datasets { for format in &formats { @@ -380,6 +388,7 @@ async fn run_random_access( ) .await?; + v3_records.push(v3::random_access_record(&measurement, dataset.name())); targets.push(measurement.target); measurements.push(measurement); progress.inc(1); @@ -406,6 +415,7 @@ async fn run_random_access( ) .await?; + v3_records.push(v3::random_access_record(&measurement, dataset.name())); targets.push(measurement.target); measurements.push(measurement); progress.inc(1); @@ -416,6 +426,10 @@ async fn run_random_access( progress.finish(); + if let Some(path) = gh_json_v3 { + v3::write_jsonl_to_path(&path, &v3_records)?; + } + let mut writer = create_output_writer(&display_format, output_path, BENCHMARK_ID)?; match display_format { diff --git a/benchmarks/vector-search-bench/src/main.rs b/benchmarks/vector-search-bench/src/main.rs index 626e3bfce50..440de142bef 100644 --- a/benchmarks/vector-search-bench/src/main.rs +++ b/benchmarks/vector-search-bench/src/main.rs @@ -28,6 +28,7 @@ use vector_search_bench::scan::ScanConfig; use vector_search_bench::scan::ScanTiming; use vector_search_bench::scan::run_search_scan; use vortex_bench::setup_logging_and_tracing; +use vortex_bench::v3; use vortex_bench::vector_dataset; use vortex_bench::vector_dataset::TrainLayout; use vortex_bench::vector_dataset::VectorDataset; @@ -71,6 +72,11 @@ struct Args { #[arg(long)] output_path: Option, + /// Additionally write v3 JSONL records to this path. See + /// `benchmarks-website/planning/02-contracts.md`. + #[arg(long)] + gh_json_v3: Option, + /// Emit verbose tracing. #[arg(short, long)] verbose: bool, @@ -143,6 +149,36 @@ async fn main() -> Result<()> { vortex_results: &pairs, }; + // Emit v3 JSONL if requested. The records carry the per-scan dimensions that + // ScanTiming itself does not (dataset, layout, threshold). + if let Some(path) = args.gh_json_v3.as_ref() { + let records: Vec = scan_timings + .iter() + .map(|scan| { + let all_runs_ns: Vec = scan + .all_runs + .iter() + .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX)) + .collect(); + let median_ns = u64::try_from(scan.median.as_nanos()).unwrap_or(u64::MAX); + v3::vector_search_record( + v3::VectorSearchDims { + dataset: dataset.name(), + layout: layout.label(), + flavor: scan.flavor.label(), + threshold: f64::from(args.threshold), + }, + median_ns, + all_runs_ns, + scan.matches, + scan.rows_scanned, + scan.bytes_scanned, + ) + }) + .collect(); + v3::write_jsonl_to_path(path, &records)?; + } + // Print the results. if let Some(path) = args.output_path { let mut file = diff --git a/scripts/post-ingest.py b/scripts/post-ingest.py new file mode 100755 index 00000000000..2e7cb8edf7f --- /dev/null +++ b/scripts/post-ingest.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [] +# /// + +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +"""Wrap a `--gh-json-v3` JSONL file in an envelope and POST to /api/ingest. + +Reads bare v3 records from a JSONL file produced by `vortex-bench --gh-json-v3`, +fills the `commit` envelope by shelling out to `git show`, and POSTs the +envelope to `/api/ingest` with a bearer token. + +Standard library only -- urllib, json, subprocess. No retries, no spool, no +outbox. See `benchmarks-website/planning/02-contracts.md` and +`benchmarks-website/planning/components/emitter.md`. +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import urllib.error +import urllib.request +from datetime import UTC, datetime +from pathlib import Path + +SCHEMA_VERSION = 1 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="POST a v3 JSONL records file to /api/ingest.", + ) + parser.add_argument( + "jsonl_path", + type=Path, + help="Path to the JSONL file written by vortex-bench --gh-json-v3.", + ) + parser.add_argument( + "--server", + required=True, + help="Server base URL, e.g. http://localhost:8080.", + ) + parser.add_argument( + "--commit-sha", + required=True, + help="40-hex commit SHA. Usually ${{ github.sha }} in CI.", + ) + parser.add_argument( + "--benchmark-id", + required=True, + help="Run identifier echoed back in run_meta.benchmark_id.", + ) + parser.add_argument( + "--token-env", + default="INGEST_BEARER_TOKEN", + help="Env var holding the bearer token (default: INGEST_BEARER_TOKEN).", + ) + parser.add_argument( + "--repo-url", + default="https://github.com/vortex-data/vortex", + help="Base repo URL used to build the commits.url field.", + ) + parser.add_argument( + "--git-dir", + type=Path, + default=None, + help="Run `git show` in this directory (default: current directory).", + ) + parser.add_argument( + "--timeout", + type=float, + default=30.0, + help="HTTP timeout in seconds (default: 30).", + ) + return parser.parse_args() + + +def read_records(path: Path) -> list[dict]: + records: list[dict] = [] + with path.open("r", encoding="utf-8") as fp: + for line_no, line in enumerate(fp, start=1): + line = line.strip() + if not line: + continue + try: + records.append(json.loads(line)) + except json.JSONDecodeError as exc: + raise SystemExit(f"{path}:{line_no}: invalid JSON: {exc}") from exc + return records + + +def git_show_field(sha: str, fmt: str, cwd: Path | None) -> str: + """Run `git show -s --format= ` and return its stdout (stripped).""" + result = subprocess.run( + ["git", "show", "-s", f"--format={fmt}", sha], + cwd=cwd, + check=True, + capture_output=True, + text=True, + ) + return result.stdout.strip() + + +def build_commit(sha: str, repo_url: str, git_dir: Path | None) -> dict: + sha = sha.strip().lower() + if len(sha) != 40 or any(c not in "0123456789abcdef" for c in sha): + raise SystemExit(f"commit SHA must be 40-hex lowercase, got: {sha!r}") + + timestamp = git_show_field(sha, "%cI", git_dir) + message = git_show_field(sha, "%s", git_dir) + author_name = git_show_field(sha, "%an", git_dir) + author_email = git_show_field(sha, "%ae", git_dir) + committer_name = git_show_field(sha, "%cn", git_dir) + committer_email = git_show_field(sha, "%ce", git_dir) + tree_sha = git_show_field(sha, "%T", git_dir) + + return { + "sha": sha, + "timestamp": timestamp, + "message": message, + "author_name": author_name, + "author_email": author_email, + "committer_name": committer_name, + "committer_email": committer_email, + "tree_sha": tree_sha, + "url": f"{repo_url.rstrip('/')}/commit/{sha}", + } + + +def post(server: str, envelope: dict, token: str, timeout: float) -> tuple[int, bytes]: + body = json.dumps(envelope).encode("utf-8") + url = f"{server.rstrip('/')}/api/ingest" + request = urllib.request.Request( + url, + data=body, + method="POST", + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {token}", + }, + ) + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + return response.status, response.read() + except urllib.error.HTTPError as exc: + return exc.code, exc.read() + + +def main() -> int: + args = parse_args() + + token = os.environ.get(args.token_env) + if not token: + print( + f"error: env var {args.token_env} is not set", + file=sys.stderr, + ) + return 2 + + records = read_records(args.jsonl_path) + commit = build_commit(args.commit_sha, args.repo_url, args.git_dir) + + envelope = { + "run_meta": { + "benchmark_id": args.benchmark_id, + "schema_version": SCHEMA_VERSION, + "started_at": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + "commit": commit, + "records": records, + } + + status, body = post(args.server, envelope, token, args.timeout) + body_text = body.decode("utf-8", errors="replace") + + if status >= 400: + print( + f"error: POST {args.server}/api/ingest -> {status}\n{body_text}", + file=sys.stderr, + ) + return 1 + + print(body_text) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index 2e1e1a9a423..fc1766c599f 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -67,5 +67,8 @@ tracing-subscriber = { workspace = true, features = [ url = { workspace = true } uuid = { workspace = true, features = ["v4"] } +[dev-dependencies] +insta = { workspace = true } + [features] unstable_encodings = ["vortex/unstable_encodings"] diff --git a/vortex-bench/REUSE.toml b/vortex-bench/REUSE.toml index 12dcf5d707f..5fb9bd33466 100644 --- a/vortex-bench/REUSE.toml +++ b/vortex-bench/REUSE.toml @@ -9,3 +9,9 @@ SPDX-License-Identifier = "Apache-2.0" path = "**/*.csv" SPDX-FileCopyrightText = "Copyright the Vortex contributors" SPDX-License-Identifier = "Apache-2.0" + +# `insta` snapshot files do not allow leading comment lines. +[[annotations]] +path = "src/snapshots/**.snap" +SPDX-FileCopyrightText = "Copyright the Vortex contributors" +SPDX-License-Identifier = "Apache-2.0" diff --git a/vortex-bench/src/compress/mod.rs b/vortex-bench/src/compress/mod.rs index 41e1e53a65c..87336ab0768 100644 --- a/vortex-bench/src/compress/mod.rs +++ b/vortex-bench/src/compress/mod.rs @@ -64,6 +64,8 @@ pub struct CompressResult { pub time: Duration, pub compressed_size: u64, pub timing: CompressionTimingMeasurement, + /// Per-iteration encode wall times. Captured for v3 emission. + pub all_runs: Vec, pub ratios: Vec, } @@ -71,6 +73,8 @@ pub struct CompressResult { pub struct DecompressResult { pub time: Duration, pub timing: CompressionTimingMeasurement, + /// Per-iteration decode wall times. Captured for v3 emission. + pub all_runs: Vec, } /// Trait for format-specific compression/decompression operations. @@ -111,12 +115,14 @@ pub async fn benchmark_compress( let format = compressor.format(); let mut fastest = Duration::MAX; let mut compressed_size = 0u64; + let mut all_runs = Vec::with_capacity(iterations); for _ in 0..iterations { let (size, elapsed) = compressor.compress(parquet_path).await?; compressed_size = size; fastest = fastest.min(elapsed); + all_runs.push(elapsed); } let ratios = vec![CustomUnitMeasurement { @@ -136,6 +142,7 @@ pub async fn benchmark_compress( time: fastest, compressed_size, timing, + all_runs, ratios, }) } @@ -151,11 +158,13 @@ pub async fn benchmark_decompress( ) -> Result { let format = compressor.format(); let mut fastest = Duration::MAX; + let mut all_runs = Vec::with_capacity(iterations); for _ in 0..iterations { let elapsed = compressor.decompress(parquet_path).await?; fastest = fastest.min(elapsed); + all_runs.push(elapsed); } let timing = CompressionTimingMeasurement { @@ -167,6 +176,7 @@ pub async fn benchmark_decompress( Ok(DecompressResult { time: fastest, timing, + all_runs, }) } diff --git a/vortex-bench/src/datasets/mod.rs b/vortex-bench/src/datasets/mod.rs index 7136d5451e1..7a499ff6998 100644 --- a/vortex-bench/src/datasets/mod.rs +++ b/vortex-bench/src/datasets/mod.rs @@ -37,6 +37,17 @@ pub(crate) fn normalize_benchmark_runner_id(benchmark_runner: &str) -> String { pub trait Dataset { fn name(&self) -> &str; + /// Map this dataset to the v3 `(dataset, dataset_variant)` pair. + /// + /// Default: `(name(), None)`. Override for suites that have a parent + /// namespace and a sub-dataset (e.g. Public-BI emits + /// `dataset = "public-bi"`, `dataset_variant = ""`). + /// The convention matches the SQL query path; see the per-suite dim + /// values table in `benchmarks-website/planning/benchmark-mapping.md`. + fn v3_dataset_dims(&self) -> (&str, Option<&str>) { + (self.name(), None) + } + async fn to_vortex_array(&self, ctx: &mut ExecutionCtx) -> Result; /// Get the path to the parquet file for this dataset. diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index db9bd69c6ce..77d9da1e235 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -53,6 +53,7 @@ pub mod statpopgen; pub mod tpcds; pub mod tpch; pub mod utils; +pub mod v3; pub mod vector_dataset; pub use benchmark::Benchmark; diff --git a/vortex-bench/src/public_bi.rs b/vortex-bench/src/public_bi.rs index 7406347d370..2058a6335b5 100644 --- a/vortex-bench/src/public_bi.rs +++ b/vortex-bench/src/public_bi.rs @@ -453,6 +453,10 @@ impl Dataset for PBIBenchmark { &self.name } + fn v3_dataset_dims(&self) -> (&str, Option<&str>) { + ("public-bi", Some(&self.name)) + } + async fn to_vortex_array(&self, _ctx: &mut ExecutionCtx) -> anyhow::Result { let dataset = self.dataset()?; dataset.write_as_vortex().await?; diff --git a/vortex-bench/src/runner.rs b/vortex-bench/src/runner.rs index 3885bace2af..b3ea8b5c1b3 100644 --- a/vortex-bench/src/runner.rs +++ b/vortex-bench/src/runner.rs @@ -260,6 +260,24 @@ impl SqlBenchmarkRunner { } } + /// Build v3 `query_measurement` records from the runner's collected results. + /// + /// Each [`QueryMeasurement`] is paired with its matching [`MemoryMeasurement`] + /// (matched on `(query_idx, target)`); pairs collapse into one record. If + /// `--track-memory` was off, no memory pair exists and the memory fields are + /// omitted from the record. + pub fn v3_records(&self) -> Vec { + let mut records = Vec::with_capacity(self.query_measurements.len()); + for qm in &self.query_measurements { + let memory = self + .memory_measurements + .iter() + .find(|m| m.query_idx == qm.query_idx && m.target == qm.target); + records.push(crate::v3::query_measurement_record(qm, memory)); + } + records + } + /// Run (or explain) all queries for all formats synchronously. /// /// In `Run` mode, executes each query `iterations` times, collecting timing. diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_size.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_size.snap new file mode 100644 index 00000000000..3a4ed1aa8a5 --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_size.snap @@ -0,0 +1,11 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record) +--- +{ + "kind": "compression_size", + "commit_sha": "", + "dataset": "taxi", + "format": "lance", + "value_bytes": 12345678 +} diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_encode.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_encode.snap new file mode 100644 index 00000000000..f06edba7b47 --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_encode.snap @@ -0,0 +1,18 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record) +--- +{ + "kind": "compression_time", + "commit_sha": "", + "dataset": "taxi", + "format": "vortex-file-compressed", + "op": "encode", + "value_ns": 5000000, + "all_runtimes_ns": [ + 5500000, + 5000000, + 5200000 + ], + "env_triple": "" +} diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_public_bi_variant.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_public_bi_variant.snap new file mode 100644 index 00000000000..3778451ab93 --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_compression_time_public_bi_variant.snap @@ -0,0 +1,19 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record)? +--- +{ + "kind": "compression_time", + "commit_sha": "", + "dataset": "public-bi", + "dataset_variant": "cms-provider", + "format": "vortex-file-compressed", + "op": "encode", + "value_ns": 5000000, + "all_runtimes_ns": [ + 5500000, + 5000000, + 5200000 + ], + "env_triple": "" +} diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_clickbench_no_memory@clickbench.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_clickbench_no_memory@clickbench.snap new file mode 100644 index 00000000000..f3273b62a1b --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_clickbench_no_memory@clickbench.snap @@ -0,0 +1,19 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record) +--- +{ + "kind": "query_measurement", + "commit_sha": "", + "dataset": "clickbench", + "dataset_variant": "partitioned", + "query_idx": 1, + "storage": "s3", + "engine": "duckdb", + "format": "parquet", + "value_ns": 2000000, + "all_runtimes_ns": [ + 2000000 + ], + "env_triple": "" +} diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_with_memory@with_memory.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_with_memory@with_memory.snap new file mode 100644 index 00000000000..6a71d950940 --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_query_measurement_with_memory@with_memory.snap @@ -0,0 +1,25 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record) +--- +{ + "kind": "query_measurement", + "commit_sha": "", + "dataset": "tpch", + "scale_factor": "10", + "query_idx": 3, + "storage": "nvme", + "engine": "datafusion", + "format": "vortex-file-compressed", + "value_ns": 1000000, + "all_runtimes_ns": [ + 1000000, + 1200000, + 900000 + ], + "peak_physical": 8192, + "peak_virtual": 16384, + "physical_delta": 1024, + "virtual_delta": 4096, + "env_triple": "" +} diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_random_access_time.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_random_access_time.snap new file mode 100644 index 00000000000..6fce08f5ee2 --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_random_access_time.snap @@ -0,0 +1,17 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record) +--- +{ + "kind": "random_access_time", + "commit_sha": "", + "dataset": "taxi", + "format": "parquet", + "value_ns": 850000, + "all_runtimes_ns": [ + 800000, + 900000, + 850000 + ], + "env_triple": "" +} diff --git a/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_vector_search_run.snap b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_vector_search_run.snap new file mode 100644 index 00000000000..ae632d1bbd4 --- /dev/null +++ b/vortex-bench/src/snapshots/vortex_bench__v3__tests__snapshot_vector_search_run.snap @@ -0,0 +1,23 @@ +--- +source: vortex-bench/src/v3.rs +expression: render(&record) +--- +{ + "kind": "vector_search_run", + "commit_sha": "", + "dataset": "cohere-large-10m", + "layout": "partitioned", + "flavor": "vortex-turboquant", + "threshold": 0.85, + "value_ns": 42000000, + "all_runtimes_ns": [ + 45000000, + 42000000, + 41000000 + ], + "matches": 123, + "rows_scanned": 10000000, + "bytes_scanned": 512000000, + "iterations": 3, + "env_triple": "" +} diff --git a/vortex-bench/src/v3.rs b/vortex-bench/src/v3.rs new file mode 100644 index 00000000000..58249a3277f --- /dev/null +++ b/vortex-bench/src/v3.rs @@ -0,0 +1,574 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! v3 wire-format records emitted by `--gh-json-v3`. +//! +//! See `benchmarks-website/planning/02-contracts.md` for the discriminated record +//! format and `benchmarks-website/planning/01-schema.md` for the destination +//! tables. The records emitted here are bare: the post-ingest envelope +//! (`run_meta` + `commit`) is added by `scripts/post-ingest.py` before POSTing +//! to `/api/ingest`. +//! +//! This module is purely additive to the existing `gh-json` emission path. + +use std::io::Write; +use std::sync::LazyLock; + +use serde::Serialize; +use target_lexicon::Triple; + +use crate::BenchmarkDataset; +use crate::Engine; +use crate::Format; +use crate::clickbench::Flavor; +use crate::compress::CompressOp; +use crate::measurements::CompressionTimingMeasurement; +use crate::measurements::MemoryMeasurement; +use crate::measurements::QueryMeasurement; +use crate::measurements::TimingMeasurement; +use crate::utils::GIT_COMMIT_ID; + +/// `(architecture, operating_system, environment)` triple for the host running the benchmark. +/// +/// Cached for the lifetime of the process; the host triple does not change. +pub static ENV_TRIPLE: LazyLock = LazyLock::new(|| { + let host = Triple::host(); + format!( + "{}-{}-{}", + host.architecture, host.operating_system, host.environment + ) +}); + +/// Wire-format kind discriminator. One value per fact table. +/// +/// Each variant flattens its inner record next to a `"kind"` field, matching the +/// shape consumed by `/api/ingest`. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum V3Record { + /// SQL query suite measurement (TPC-H/TPC-DS/ClickBench/...). + QueryMeasurement(QueryMeasurementRecord), + /// `compress-bench` encode/decode timing. + CompressionTime(CompressionTimeRecord), + /// `compress-bench` on-disk size. + CompressionSize(CompressionSizeRecord), + /// `random-access-bench` take timing. + RandomAccessTime(RandomAccessTimeRecord), + /// `vector-search-bench` cosine-similarity scan run. + VectorSearchRun(VectorSearchRunRecord), +} + +/// A single SQL-query measurement, fused from a [`QueryMeasurement`] and an +/// optional paired [`MemoryMeasurement`]. +/// +/// Memory fields are populated together (all four or none), matching the +/// `--track-memory` instrumentation. +#[derive(Debug, Clone, Serialize)] +pub struct QueryMeasurementRecord { + /// 40-hex lowercase commit SHA. + pub commit_sha: String, + /// Top-level dataset name (`tpch`, `tpcds`, `clickbench`, ...). + pub dataset: String, + /// ClickBench flavor (`partitioned`/`single`) or Public-BI sub-dataset name. + #[serde(skip_serializing_if = "Option::is_none")] + pub dataset_variant: Option, + /// TPC scale factor or `n_rows` for StatPopGen / PolarSignals. + #[serde(skip_serializing_if = "Option::is_none")] + pub scale_factor: Option, + /// 1-based query index within the suite. + pub query_idx: u32, + /// Storage backend the run targeted (`nvme` or `s3`). + pub storage: String, + /// Query engine (`datafusion`, `duckdb`, `vortex`, `arrow`). + pub engine: String, + /// On-disk format (`parquet`, `vortex-file-compressed`, `lance`, ...). + pub format: String, + /// Median per-iteration wall time in nanoseconds. + pub value_ns: u64, + /// Per-iteration wall times in nanoseconds. + pub all_runtimes_ns: Vec, + /// Peak resident-set bytes during the query, when memory tracking was on. + #[serde(skip_serializing_if = "Option::is_none")] + pub peak_physical: Option, + /// Peak virtual-memory bytes during the query, when memory tracking was on. + #[serde(skip_serializing_if = "Option::is_none")] + pub peak_virtual: Option, + /// Resident-set delta across the query, when memory tracking was on. + #[serde(skip_serializing_if = "Option::is_none")] + pub physical_delta: Option, + /// Virtual-memory delta across the query, when memory tracking was on. + #[serde(skip_serializing_if = "Option::is_none")] + pub virtual_delta: Option, + /// Host environment triple (e.g. `x86_64-linux-gnu`). + #[serde(skip_serializing_if = "Option::is_none")] + pub env_triple: Option, +} + +/// A single encode-or-decode timing from `compress-bench`. +#[derive(Debug, Clone, Serialize)] +pub struct CompressionTimeRecord { + /// 40-hex lowercase commit SHA. + pub commit_sha: String, + /// Compression dataset name. + pub dataset: String, + /// Optional dataset variant (reserved; unused at alpha). + #[serde(skip_serializing_if = "Option::is_none")] + pub dataset_variant: Option, + /// On-disk format the timing applies to. + pub format: String, + /// `encode` or `decode`. + pub op: String, + /// Best-of-N wall time in nanoseconds. + pub value_ns: u64, + /// Per-iteration wall times in nanoseconds. + pub all_runtimes_ns: Vec, + /// Host environment triple. + #[serde(skip_serializing_if = "Option::is_none")] + pub env_triple: Option, +} + +/// On-disk size of a compressed file from `compress-bench`. +#[derive(Debug, Clone, Serialize)] +pub struct CompressionSizeRecord { + /// 40-hex lowercase commit SHA. + pub commit_sha: String, + /// Compression dataset name. + pub dataset: String, + /// Optional dataset variant (reserved; unused at alpha). + #[serde(skip_serializing_if = "Option::is_none")] + pub dataset_variant: Option, + /// On-disk format the size applies to. + pub format: String, + /// Size in bytes. + pub value_bytes: u64, +} + +/// A single take-time timing from `random-access-bench`. +#[derive(Debug, Clone, Serialize)] +pub struct RandomAccessTimeRecord { + /// 40-hex lowercase commit SHA. + pub commit_sha: String, + /// Random-access dataset name (different namespace from SQL suites). + pub dataset: String, + /// On-disk format the timing applies to. + pub format: String, + /// Median per-iteration wall time in nanoseconds. + pub value_ns: u64, + /// Per-iteration wall times in nanoseconds. + pub all_runtimes_ns: Vec, + /// Host environment triple. + #[serde(skip_serializing_if = "Option::is_none")] + pub env_triple: Option, +} + +/// A single cosine-similarity scan from `vector-search-bench`. +/// +/// Carries timing **and** the side counters in one row, mirroring the +/// `vector_search_runs` fact table. +#[derive(Debug, Clone, Serialize)] +pub struct VectorSearchRunRecord { + /// 40-hex lowercase commit SHA. + pub commit_sha: String, + /// Vector dataset name (e.g. `cohere-large-10m`). + pub dataset: String, + /// Train-split layout label (e.g. `partitioned`). + pub layout: String, + /// Compression flavor label (e.g. `vortex-turboquant`). + pub flavor: String, + /// Cosine threshold passed to the scan filter. + pub threshold: f64, + /// Median per-iteration wall time in nanoseconds. + pub value_ns: u64, + /// Per-iteration wall times in nanoseconds. + pub all_runtimes_ns: Vec, + /// Number of rows that survived the cosine filter. + pub matches: u64, + /// Total rows scanned. + pub rows_scanned: u64, + /// Total on-disk bytes scanned. + pub bytes_scanned: u64, + /// Number of timed iterations. + pub iterations: u32, + /// Host environment triple. + #[serde(skip_serializing_if = "Option::is_none")] + pub env_triple: Option, +} + +/// Map a [`BenchmarkDataset`] to the `(dataset, dataset_variant, scale_factor)` +/// triple emitted in `query_measurement` records. +/// +/// Mirrors the `Per-suite dim values` table in +/// `benchmarks-website/planning/benchmark-mapping.md`. +pub fn benchmark_dataset_dims(d: &BenchmarkDataset) -> (String, Option, Option) { + match d { + BenchmarkDataset::TpcH { scale_factor } => { + ("tpch".to_string(), None, Some(scale_factor.clone())) + } + BenchmarkDataset::TpcDS { scale_factor } => { + ("tpcds".to_string(), None, Some(scale_factor.clone())) + } + BenchmarkDataset::ClickBench { flavor } => { + let variant = match flavor { + Flavor::Partitioned => "partitioned", + Flavor::Single => "single", + }; + ("clickbench".to_string(), Some(variant.to_string()), None) + } + BenchmarkDataset::PublicBi { name } => ("public-bi".to_string(), Some(name.clone()), None), + BenchmarkDataset::StatPopGen { n_rows } => { + ("statpopgen".to_string(), None, Some(n_rows.to_string())) + } + BenchmarkDataset::PolarSignals { n_rows } => { + ("polarsignals".to_string(), None, Some(n_rows.to_string())) + } + BenchmarkDataset::Fineweb => ("fineweb".to_string(), None, None), + BenchmarkDataset::GhArchive => ("gharchive".to_string(), None, None), + } +} + +/// Build a `query_measurement` record by collapsing a [`QueryMeasurement`] and +/// an optional paired [`MemoryMeasurement`] into one wire row. +/// +/// The pair is matched by the caller; this function does not search. +pub fn query_measurement_record( + qm: &QueryMeasurement, + memory: Option<&MemoryMeasurement>, +) -> V3Record { + let (dataset, dataset_variant, scale_factor) = benchmark_dataset_dims(&qm.benchmark_dataset); + let value_ns = duration_as_ns(qm.median_run()); + let all_runtimes_ns = qm.runs.iter().copied().map(duration_as_ns).collect(); + let (peak_physical, peak_virtual, physical_delta, virtual_delta) = match memory { + Some(m) => ( + Some(m.peak_physical_memory), + Some(m.peak_virtual_memory), + Some(m.physical_memory_delta), + Some(m.virtual_memory_delta), + ), + None => (None, None, None, None), + }; + V3Record::QueryMeasurement(QueryMeasurementRecord { + commit_sha: GIT_COMMIT_ID.clone(), + dataset, + dataset_variant, + scale_factor, + query_idx: u32::try_from(qm.query_idx).unwrap_or(u32::MAX), + storage: qm.storage.clone(), + engine: engine_label(qm.target.engine).to_string(), + format: qm.target.format.name().to_string(), + value_ns, + all_runtimes_ns, + peak_physical, + peak_virtual, + physical_delta, + virtual_delta, + env_triple: Some(ENV_TRIPLE.clone()), + }) +} + +/// Build a `compression_time` record from a [`CompressionTimingMeasurement`]. +/// +/// Caller passes `dataset` (the compress-bench dataset name) and the +/// `op`. `dataset_variant` is reserved and unused at alpha. +pub fn compression_time_record( + timing: &CompressionTimingMeasurement, + dataset: &str, + dataset_variant: Option<&str>, + op: CompressOp, + all_runtimes_ns: Vec, +) -> V3Record { + V3Record::CompressionTime(CompressionTimeRecord { + commit_sha: GIT_COMMIT_ID.clone(), + dataset: dataset.to_string(), + dataset_variant: dataset_variant.map(str::to_string), + format: timing.format.name().to_string(), + op: compress_op_label(op).to_string(), + value_ns: duration_as_ns(timing.time), + all_runtimes_ns, + env_triple: Some(ENV_TRIPLE.clone()), + }) +} + +/// Build a `compression_size` record. +pub fn compression_size_record( + dataset: &str, + dataset_variant: Option<&str>, + format: Format, + value_bytes: u64, +) -> V3Record { + V3Record::CompressionSize(CompressionSizeRecord { + commit_sha: GIT_COMMIT_ID.clone(), + dataset: dataset.to_string(), + dataset_variant: dataset_variant.map(str::to_string), + format: format.name().to_string(), + value_bytes, + }) +} + +/// Build a `random_access_time` record from a [`TimingMeasurement`]. +pub fn random_access_record(timing: &TimingMeasurement, dataset: &str) -> V3Record { + let value_ns = duration_as_ns(timing.median_time()); + let all_runtimes_ns = timing.runs.iter().copied().map(duration_as_ns).collect(); + V3Record::RandomAccessTime(RandomAccessTimeRecord { + commit_sha: GIT_COMMIT_ID.clone(), + dataset: dataset.to_string(), + format: timing.target.format.name().to_string(), + value_ns, + all_runtimes_ns, + env_triple: Some(ENV_TRIPLE.clone()), + }) +} + +/// Inputs for [`vector_search_record`]. The caller supplies the per-scan +/// dimensions that don't live on `ScanTiming`. +pub struct VectorSearchDims<'a> { + /// Vector dataset name (e.g. `cohere-large-10m`). + pub dataset: &'a str, + /// Train-split layout label (e.g. `partitioned`). + pub layout: &'a str, + /// Compression flavor label (e.g. `vortex-turboquant`). + pub flavor: &'a str, + /// Cosine threshold the scan was run with. + pub threshold: f64, +} + +/// Build a `vector_search_run` record. `iterations` is `all_runs.len()`; we keep +/// it explicit since the contract has it as a real column. +pub fn vector_search_record( + dims: VectorSearchDims<'_>, + median_ns: u64, + all_runs_ns: Vec, + matches: u64, + rows_scanned: u64, + bytes_scanned: u64, +) -> V3Record { + let iterations = u32::try_from(all_runs_ns.len()).unwrap_or(u32::MAX); + V3Record::VectorSearchRun(VectorSearchRunRecord { + commit_sha: GIT_COMMIT_ID.clone(), + dataset: dims.dataset.to_string(), + layout: dims.layout.to_string(), + flavor: dims.flavor.to_string(), + threshold: dims.threshold, + value_ns: median_ns, + all_runtimes_ns: all_runs_ns, + matches, + rows_scanned, + bytes_scanned, + iterations, + env_triple: Some(ENV_TRIPLE.clone()), + }) +} + +/// Write `records` as JSONL (one JSON object per line) to `writer`. +/// +/// JSONL is the on-disk format consumed by `scripts/post-ingest.py`. +pub fn write_jsonl(writer: &mut W, records: &[V3Record]) -> std::io::Result<()> { + for record in records { + let line = serde_json::to_string(record).map_err(std::io::Error::other)?; + writer.write_all(line.as_bytes())?; + writer.write_all(b"\n")?; + } + Ok(()) +} + +/// Write `records` as JSONL to `path`, creating parent directories as needed. +pub fn write_jsonl_to_path(path: &std::path::Path, records: &[V3Record]) -> std::io::Result<()> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let mut file = std::fs::File::create(path)?; + write_jsonl(&mut file, records) +} + +fn duration_as_ns(d: std::time::Duration) -> u64 { + u64::try_from(d.as_nanos()).unwrap_or(u64::MAX) +} + +fn engine_label(engine: Engine) -> &'static str { + match engine { + Engine::Vortex => "vortex", + Engine::Arrow => "arrow", + Engine::DataFusion => "datafusion", + Engine::DuckDB => "duckdb", + } +} + +fn compress_op_label(op: CompressOp) -> &'static str { + match op { + CompressOp::Compress => "encode", + CompressOp::Decompress => "decode", + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use insta::assert_snapshot; + use insta::with_settings; + + use super::*; + use crate::Target; + use crate::clickbench::Flavor; + use crate::memory::MemoryMeasurementResult; + + fn redact_env(json: &str) -> String { + json.replace(ENV_TRIPLE.as_str(), "") + .replace(GIT_COMMIT_ID.as_str(), "") + } + + fn render(record: &V3Record) -> anyhow::Result { + let json = serde_json::to_string_pretty(record)?; + Ok(redact_env(&json)) + } + + #[test] + fn snapshot_query_measurement_with_memory() -> anyhow::Result<()> { + let qm = QueryMeasurement { + query_idx: 3, + target: Target::new(Engine::DataFusion, Format::OnDiskVortex), + benchmark_dataset: BenchmarkDataset::TpcH { + scale_factor: "10".to_string(), + }, + benchmark_runner: "test-runner".to_string(), + storage: "nvme".to_string(), + runs: vec![ + Duration::from_nanos(1_000_000), + Duration::from_nanos(1_200_000), + Duration::from_nanos(900_000), + ], + }; + let mm = MemoryMeasurement::new( + qm.query_idx, + qm.target, + qm.benchmark_dataset.clone(), + qm.benchmark_runner.clone(), + qm.storage.clone(), + MemoryMeasurementResult { + physical_memory_delta: 1024, + virtual_memory_delta: 4096, + peak_physical_memory: 8192, + peak_virtual_memory: 16384, + }, + ); + let record = query_measurement_record(&qm, Some(&mm)); + let rendered = render(&record)?; + with_settings!({snapshot_suffix => "with_memory"}, { + assert_snapshot!(rendered); + }); + Ok(()) + } + + #[test] + fn snapshot_query_measurement_clickbench_no_memory() -> anyhow::Result<()> { + let qm = QueryMeasurement { + query_idx: 1, + target: Target::new(Engine::DuckDB, Format::Parquet), + benchmark_dataset: BenchmarkDataset::ClickBench { + flavor: Flavor::Partitioned, + }, + benchmark_runner: "ci-runner".to_string(), + storage: "s3".to_string(), + runs: vec![Duration::from_nanos(2_000_000)], + }; + let record = query_measurement_record(&qm, None); + let rendered = render(&record)?; + with_settings!({snapshot_suffix => "clickbench"}, { + assert_snapshot!(rendered); + }); + Ok(()) + } + + #[test] + fn snapshot_compression_time_encode() -> anyhow::Result<()> { + let timing = CompressionTimingMeasurement { + name: "compress time/taxi".to_string(), + format: Format::OnDiskVortex, + time: Duration::from_nanos(5_000_000), + }; + let record = compression_time_record( + &timing, + "taxi", + None, + CompressOp::Compress, + vec![5_500_000, 5_000_000, 5_200_000], + ); + assert_snapshot!(render(&record)?); + Ok(()) + } + + #[test] + fn snapshot_compression_size() -> anyhow::Result<()> { + let record = compression_size_record("taxi", None, Format::Lance, 12_345_678); + assert_snapshot!(render(&record)?); + Ok(()) + } + + #[test] + fn snapshot_compression_time_public_bi_variant() -> anyhow::Result<()> { + let timing = CompressionTimingMeasurement { + name: "compress time/cms-provider".to_string(), + format: Format::OnDiskVortex, + time: Duration::from_nanos(5_000_000), + }; + let record = compression_time_record( + &timing, + "public-bi", + Some("cms-provider"), + CompressOp::Compress, + vec![5_500_000, 5_000_000, 5_200_000], + ); + assert_snapshot!(render(&record)?); + Ok(()) + } + + #[test] + fn snapshot_random_access_time() -> anyhow::Result<()> { + let timing = TimingMeasurement { + name: "random-access/taxi/uniform/parquet-tokio-local-disk".to_string(), + target: Target::new(Engine::Arrow, Format::Parquet), + storage: "nvme".to_string(), + runs: vec![ + Duration::from_nanos(800_000), + Duration::from_nanos(900_000), + Duration::from_nanos(850_000), + ], + }; + let record = random_access_record(&timing, "taxi"); + assert_snapshot!(render(&record)?); + Ok(()) + } + + #[test] + fn snapshot_vector_search_run() -> anyhow::Result<()> { + let dims = VectorSearchDims { + dataset: "cohere-large-10m", + layout: "partitioned", + flavor: "vortex-turboquant", + threshold: 0.85, + }; + let record = vector_search_record( + dims, + 42_000_000, + vec![45_000_000, 42_000_000, 41_000_000], + 123, + 10_000_000, + 512_000_000, + ); + assert_snapshot!(render(&record)?); + Ok(()) + } + + #[test] + fn jsonl_round_trips_one_record_per_line() -> anyhow::Result<()> { + let record = compression_size_record("taxi", None, Format::Parquet, 100); + let mut buf: Vec = Vec::new(); + write_jsonl(&mut buf, &[record.clone(), record])?; + let s = String::from_utf8(buf)?; + assert_eq!(s.lines().count(), 2); + for line in s.lines() { + let v: serde_json::Value = serde_json::from_str(line)?; + assert_eq!(v["kind"], "compression_size"); + } + Ok(()) + } +} From ffd090bad5149ea5bceb2bdb7fac51a281e6a754 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 4 May 2026 15:48:53 -0400 Subject: [PATCH 2/3] fix issues Signed-off-by: Connor Tsui --- bench-orchestrator/bench_orchestrator/cli.py | 50 ++++++++++- bench-orchestrator/tests/test_cli.py | 44 +++++++++ benchmarks/random-access-bench/src/main.rs | 93 +++++++++++++++++++- vortex-bench/src/v3.rs | 54 ++++++++++-- 4 files changed, 228 insertions(+), 13 deletions(-) diff --git a/bench-orchestrator/bench_orchestrator/cli.py b/bench-orchestrator/bench_orchestrator/cli.py index 6c200015182..a9e37e70309 100644 --- a/bench-orchestrator/bench_orchestrator/cli.py +++ b/bench-orchestrator/bench_orchestrator/cli.py @@ -7,6 +7,7 @@ from contextlib import contextmanager from datetime import datetime, timedelta from pathlib import Path +from tempfile import TemporaryDirectory from typing import Annotated import pandas as pd @@ -115,6 +116,38 @@ def open_results_output(path: Path | None): yield handle +@contextmanager +def temporary_v3_output_dir(enabled: bool): + """Create a temporary directory for per-backend v3 JSONL files.""" + if not enabled: + yield None + return + + with TemporaryDirectory(prefix="vx-bench-v3-") as temp_dir: + yield Path(temp_dir) + + +def backend_v3_output_path(temp_dir: Path | None, index: int, backend: Engine) -> Path | None: + """Return the v3 JSONL path a backend should write, if v3 output is enabled.""" + if temp_dir is None: + return None + return temp_dir / f"{index:02d}-{backend.value}.jsonl" + + +def write_combined_v3_output(output_path: Path, input_paths: list[Path]) -> None: + """Concatenate successful per-backend v3 JSONL files into the requested output.""" + if output_path.parent != Path(): + output_path.parent.mkdir(parents=True, exist_ok=True) + + with output_path.open("w", encoding="utf-8") as output: + for input_path in input_paths: + if not input_path.exists(): + raise RuntimeError(f"v3 output was not written by benchmark backend: {input_path}") + with input_path.open("r", encoding="utf-8") as input_file: + for line in input_file: + output.write(line) + + def write_result_line(line: str, store_writer, compatibility_file) -> None: """Write a raw result line to the run store and optional compatibility output.""" store_writer(line) @@ -280,10 +313,16 @@ def run( soft_failures: list[str] = [] try: - with store.create_run(config, build_config) as ctx, open_results_output(output) as compatibility_file: - for backend, backend_targets in backend_groups.items(): + with ( + store.create_run(config, build_config) as ctx, + open_results_output(output) as compatibility_file, + temporary_v3_output_dir(gh_json_v3 is not None) as v3_temp_dir, + ): + v3_output_parts: list[Path] = [] + for backend_idx, (backend, backend_targets) in enumerate(backend_groups.items()): executor = BenchmarkExecutor(binary_paths[backend], backend, verbose=verbose) backend_formats = [target.format for target in backend_targets] + backend_gh_json_v3 = backend_v3_output_path(v3_temp_dir, backend_idx, backend) try: results = executor.run( @@ -298,7 +337,7 @@ def run( sample_rate=sample_rate, tracing=tracing, runner=runner, - gh_json_v3=gh_json_v3, + gh_json_v3=backend_gh_json_v3, on_result=lambda line, store_writer=ctx.write_raw_json, compatibility=compatibility_file: ( write_result_line( line, @@ -307,6 +346,8 @@ def run( ) ), ) + if backend_gh_json_v3 is not None: + v3_output_parts.append(backend_gh_json_v3) console.print(f"[green]{backend.value}: {len(results)} results[/green]") except RuntimeError as exc: ctx.metadata.partial = True @@ -315,6 +356,9 @@ def run( console.print(f"[red]{backend.value} failed: {exc}[/red]") soft_failures.append(str(exc)) + if gh_json_v3 is not None: + write_combined_v3_output(gh_json_v3, v3_output_parts) + ctx.metadata.binaries = {backend.value: str(path) for backend, path in binary_paths.items()} except RuntimeError as exc: console.print(f"[red]{exc}[/red]") diff --git a/bench-orchestrator/tests/test_cli.py b/bench-orchestrator/tests/test_cli.py index ffb2e2b3ad9..4d8a2529bd6 100644 --- a/bench-orchestrator/tests/test_cli.py +++ b/bench-orchestrator/tests/test_cli.py @@ -105,3 +105,47 @@ def fake_run(self, **kwargs): metadata = json.loads((run_dirs[0] / "metadata.json").read_text(encoding="utf-8")) assert metadata["targets"] == [{"engine": "datafusion", "format": "parquet"}] assert metadata["binaries"] == {"datafusion": str(binary_path)} + + +def test_run_combines_gh_json_v3_output_per_backend(tmp_path, monkeypatch) -> None: + run_store = ResultStore(base_dir=tmp_path / "runs") + output_path = tmp_path / "artifacts" / "results.v3.jsonl" + binary_paths = { + cli_module.Engine.DATAFUSION: tmp_path / "datafusion-bench", + cli_module.Engine.DUCKDB: tmp_path / "duckdb-bench", + } + for binary_path in binary_paths.values(): + binary_path.write_text("", encoding="utf-8") + + monkeypatch.setattr(cli_module, "ResultStore", lambda: run_store) + monkeypatch.setattr(cli_module.BenchmarkBuilder, "get_binary_path", lambda self, backend: binary_paths[backend]) + + seen_backend_paths = [] + + def fake_run(self, **kwargs): + backend_output = kwargs["gh_json_v3"] + assert backend_output is not None + assert backend_output != output_path + backend_output.write_text(f"{self.backend.value}-v3\n", encoding="utf-8") + seen_backend_paths.append(backend_output) + return [] + + monkeypatch.setattr(BenchmarkExecutor, "run", fake_run) + + result = runner.invoke( + cli_module.app, + [ + "run", + "tpch", + "--targets-json", + '[{"engine":"datafusion","format":"parquet"},{"engine":"duckdb","format":"parquet"}]', + "--no-build", + "--gh-json-v3", + str(output_path), + ], + ) + + assert result.exit_code == 0 + assert output_path.read_text(encoding="utf-8") == "datafusion-v3\nduckdb-v3\n" + assert len(seen_backend_paths) == 2 + assert seen_backend_paths[0] != seen_backend_paths[1] diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index b3c85211f6f..6543e221e33 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -280,6 +280,28 @@ fn measurement_name(dataset: &str, pattern: Option, format: Forma } } +fn v3_random_access_dataset_name(dataset: &str, pattern: Option) -> String { + match pattern { + Some(pattern) => format!("{dataset}/{}", pattern.name()), + None => dataset.to_string(), + } +} + +fn push_v3_random_access_record( + records: &mut Vec, + measurement: &TimingMeasurement, + dataset: &str, + pattern: Option, + reopen: bool, +) { + if reopen { + return; + } + + let dataset = v3_random_access_dataset_name(dataset, pattern); + records.push(v3::random_access_record(measurement, &dataset)); +} + /// Map format to the appropriate engine for random access benchmarks. fn format_to_engine(format: Format) -> Engine { match format { @@ -388,7 +410,13 @@ async fn run_random_access( ) .await?; - v3_records.push(v3::random_access_record(&measurement, dataset.name())); + push_v3_random_access_record( + &mut v3_records, + &measurement, + dataset.name(), + None, + reopen, + ); targets.push(measurement.target); measurements.push(measurement); progress.inc(1); @@ -415,7 +443,13 @@ async fn run_random_access( ) .await?; - v3_records.push(v3::random_access_record(&measurement, dataset.name())); + push_v3_random_access_record( + &mut v3_records, + &measurement, + dataset.name(), + Some(*pattern), + reopen, + ); targets.push(measurement.target); measurements.push(measurement); progress.inc(1); @@ -443,3 +477,58 @@ async fn run_random_access( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn v3_random_access_dataset_names_match_schema_dims() { + assert_eq!(v3_random_access_dataset_name("taxi", None), "taxi"); + assert_eq!( + v3_random_access_dataset_name("taxi", Some(AccessPattern::Correlated)), + "taxi/correlated" + ); + assert_eq!( + v3_random_access_dataset_name("feature-vectors", Some(AccessPattern::Uniform)), + "feature-vectors/uniform" + ); + } + + #[test] + fn v3_random_access_records_skip_reopen_variants() { + let measurement = TimingMeasurement { + name: "random-access/taxi/uniform/parquet-tokio-local-disk".to_string(), + target: Target::new(Engine::Arrow, Format::Parquet), + storage: STORAGE_NVME.to_string(), + runs: vec![Duration::from_nanos(10)], + }; + let mut records = Vec::new(); + + push_v3_random_access_record(&mut records, &measurement, "taxi", None, false); + push_v3_random_access_record( + &mut records, + &measurement, + "taxi", + Some(AccessPattern::Uniform), + false, + ); + push_v3_random_access_record( + &mut records, + &measurement, + "taxi", + Some(AccessPattern::Correlated), + true, + ); + + assert_eq!(records.len(), 2); + match &records[0] { + v3::V3Record::RandomAccessTime(record) => assert_eq!(record.dataset, "taxi"), + other => panic!("expected random-access record, got {other:?}"), + } + match &records[1] { + v3::V3Record::RandomAccessTime(record) => assert_eq!(record.dataset, "taxi/uniform"), + other => panic!("expected random-access record, got {other:?}"), + } + } +} diff --git a/vortex-bench/src/v3.rs b/vortex-bench/src/v3.rs index 58249a3277f..aaa925cf23d 100644 --- a/vortex-bench/src/v3.rs +++ b/vortex-bench/src/v3.rs @@ -201,12 +201,16 @@ pub struct VectorSearchRunRecord { /// `benchmarks-website/planning/benchmark-mapping.md`. pub fn benchmark_dataset_dims(d: &BenchmarkDataset) -> (String, Option, Option) { match d { - BenchmarkDataset::TpcH { scale_factor } => { - ("tpch".to_string(), None, Some(scale_factor.clone())) - } - BenchmarkDataset::TpcDS { scale_factor } => { - ("tpcds".to_string(), None, Some(scale_factor.clone())) - } + BenchmarkDataset::TpcH { scale_factor } => ( + "tpch".to_string(), + None, + Some(canonical_tpc_scale_factor(scale_factor)), + ), + BenchmarkDataset::TpcDS { scale_factor } => ( + "tpcds".to_string(), + None, + Some(canonical_tpc_scale_factor(scale_factor)), + ), BenchmarkDataset::ClickBench { flavor } => { let variant = match flavor { Flavor::Partitioned => "partitioned", @@ -237,6 +241,7 @@ pub fn query_measurement_record( let (dataset, dataset_variant, scale_factor) = benchmark_dataset_dims(&qm.benchmark_dataset); let value_ns = duration_as_ns(qm.median_run()); let all_runtimes_ns = qm.runs.iter().copied().map(duration_as_ns).collect(); + let query_idx = v3_query_idx(qm); let (peak_physical, peak_virtual, physical_delta, virtual_delta) = match memory { Some(m) => ( Some(m.peak_physical_memory), @@ -251,7 +256,7 @@ pub fn query_measurement_record( dataset, dataset_variant, scale_factor, - query_idx: u32::try_from(qm.query_idx).unwrap_or(u32::MAX), + query_idx, storage: qm.storage.clone(), engine: engine_label(qm.target.engine).to_string(), format: qm.target.format.name().to_string(), @@ -383,6 +388,23 @@ fn duration_as_ns(d: std::time::Duration) -> u64 { u64::try_from(d.as_nanos()).unwrap_or(u64::MAX) } +fn canonical_tpc_scale_factor(scale_factor: &str) -> String { + let trimmed = scale_factor.trim(); + match trimmed.parse::() { + Ok(value) if value.is_finite() => format!("{value}"), + _ => scale_factor.to_string(), + } +} + +fn v3_query_idx(qm: &QueryMeasurement) -> u32 { + let query_idx = if matches!(&qm.benchmark_dataset, BenchmarkDataset::ClickBench { .. }) { + qm.query_idx.saturating_add(1) + } else { + qm.query_idx + }; + u32::try_from(query_idx).unwrap_or(u32::MAX) +} + fn engine_label(engine: Engine) -> &'static str { match engine { Engine::Vortex => "vortex", @@ -461,7 +483,7 @@ mod tests { #[test] fn snapshot_query_measurement_clickbench_no_memory() -> anyhow::Result<()> { let qm = QueryMeasurement { - query_idx: 1, + query_idx: 0, target: Target::new(Engine::DuckDB, Format::Parquet), benchmark_dataset: BenchmarkDataset::ClickBench { flavor: Flavor::Partitioned, @@ -478,6 +500,22 @@ mod tests { Ok(()) } + #[test] + fn tpc_scale_factors_are_canonicalized_for_query_dims() { + assert_eq!( + benchmark_dataset_dims(&BenchmarkDataset::TpcH { + scale_factor: "1.0".to_string() + }), + ("tpch".to_string(), None, Some("1".to_string())) + ); + assert_eq!( + benchmark_dataset_dims(&BenchmarkDataset::TpcDS { + scale_factor: "10.0".to_string() + }), + ("tpcds".to_string(), None, Some("10".to_string())) + ); + } + #[test] fn snapshot_compression_time_encode() -> anyhow::Result<()> { let timing = CompressionTimingMeasurement { From 5bd762b07bcbe71302cb4d480af56ba24eaab049 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 4 May 2026 16:02:43 -0400 Subject: [PATCH 3/3] remove q0 for most groups Signed-off-by: Connor Tsui --- vortex-bench/src/v3.rs | 71 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/vortex-bench/src/v3.rs b/vortex-bench/src/v3.rs index aaa925cf23d..fdc71547d10 100644 --- a/vortex-bench/src/v3.rs +++ b/vortex-bench/src/v3.rs @@ -397,7 +397,7 @@ fn canonical_tpc_scale_factor(scale_factor: &str) -> String { } fn v3_query_idx(qm: &QueryMeasurement) -> u32 { - let query_idx = if matches!(&qm.benchmark_dataset, BenchmarkDataset::ClickBench { .. }) { + let query_idx = if query_source_is_zero_based(&qm.benchmark_dataset) { qm.query_idx.saturating_add(1) } else { qm.query_idx @@ -405,6 +405,17 @@ fn v3_query_idx(qm: &QueryMeasurement) -> u32 { u32::try_from(query_idx).unwrap_or(u32::MAX) } +fn query_source_is_zero_based(dataset: &BenchmarkDataset) -> bool { + matches!( + dataset, + BenchmarkDataset::ClickBench { .. } + | BenchmarkDataset::StatPopGen { .. } + | BenchmarkDataset::PolarSignals { .. } + | BenchmarkDataset::Fineweb + | BenchmarkDataset::GhArchive + ) +} + fn engine_label(engine: Engine) -> &'static str { match engine { Engine::Vortex => "vortex", @@ -516,6 +527,64 @@ mod tests { ); } + #[test] + fn zero_based_query_sources_emit_one_based_query_idx() { + let datasets = [ + BenchmarkDataset::ClickBench { + flavor: Flavor::Partitioned, + }, + BenchmarkDataset::StatPopGen { n_rows: 100_000 }, + BenchmarkDataset::PolarSignals { n_rows: 1_000_000 }, + BenchmarkDataset::Fineweb, + BenchmarkDataset::GhArchive, + ]; + + for benchmark_dataset in datasets { + let qm = QueryMeasurement { + query_idx: 0, + target: Target::new(Engine::DataFusion, Format::Parquet), + benchmark_dataset, + benchmark_runner: "ci-runner".to_string(), + storage: "nvme".to_string(), + runs: vec![Duration::from_nanos(1)], + }; + let V3Record::QueryMeasurement(record) = query_measurement_record(&qm, None) else { + panic!("expected query measurement record"); + }; + assert_eq!(record.query_idx, 1); + } + } + + #[test] + fn one_based_query_sources_keep_query_idx() { + let datasets = [ + BenchmarkDataset::TpcH { + scale_factor: "1".to_string(), + }, + BenchmarkDataset::TpcDS { + scale_factor: "1".to_string(), + }, + BenchmarkDataset::PublicBi { + name: "cms-provider".to_string(), + }, + ]; + + for benchmark_dataset in datasets { + let qm = QueryMeasurement { + query_idx: 1, + target: Target::new(Engine::DataFusion, Format::Parquet), + benchmark_dataset, + benchmark_runner: "ci-runner".to_string(), + storage: "nvme".to_string(), + runs: vec![Duration::from_nanos(1)], + }; + let V3Record::QueryMeasurement(record) = query_measurement_record(&qm, None) else { + panic!("expected query measurement record"); + }; + assert_eq!(record.query_idx, 1); + } + } + #[test] fn snapshot_compression_time_encode() -> anyhow::Result<()> { let timing = CompressionTimingMeasurement {