diff --git a/.github/workflows/sql-benchmarks.yml b/.github/workflows/sql-benchmarks.yml index 68d75665974..0efe66f4f1a 100644 --- a/.github/workflows/sql-benchmarks.yml +++ b/.github/workflows/sql-benchmarks.yml @@ -402,6 +402,18 @@ jobs: ${{ matrix.iterations && format('--iterations {0}', matrix.iterations) || '' }} \ --opt remote-data-dir=${{ matrix.remote_storage }} \ ${{ matrix.scale_factor && format('--opt scale-factor={0}', matrix.scale_factor) || '' }} + + - name: Capture file sizes + if: matrix.remote_storage == null + shell: bash + run: | + uv run --no-project scripts/capture-file-sizes.py \ + vortex-bench/data \ + --benchmark ${{ matrix.subcommand }} \ + --commit ${{ inputs.mode == 'pr' && github.event.pull_request.head.sha || github.sha }} \ + -o sizes.json + cat sizes.json >> results.json + - name: Compare results if: inputs.mode == 'pr' shell: bash @@ -435,56 +447,6 @@ jobs: # unique benchmark configuration must have a unique comment-tag. comment-tag: bench-pr-comment-${{ matrix.id }} - - name: Compare file sizes - if: inputs.mode == 'pr' && matrix.remote_storage == null - shell: bash - run: | - set -Eeu -o pipefail -x - - # Capture HEAD file sizes (vortex formats only) - uv run --no-project scripts/capture-file-sizes.py \ - vortex-bench/data \ - --benchmark ${{ matrix.subcommand }} \ - --commit ${{ github.event.pull_request.head.sha }} \ - -o head-sizes.json - - # Get base commit SHA (same as benchmark comparison) - base_commit_sha=$(\ - curl -L \ - -H "Accept: application/vnd.github+json" \ - -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ - https://api.github.com/repos/vortex-data/vortex/actions/workflows/bench.yml/runs\?branch\=develop\&status\=success\&per_page\=1 \ - | jq -r '.workflow_runs[].head_sha' \ - ) - - # Download file sizes baseline (per-benchmark file) - python3 scripts/s3-download.py s3://vortex-ci-benchmark-results/file-sizes-${{ matrix.id }}.json.gz file-sizes.json.gz --no-sign-request || true - - # Generate comparison report - echo '# File Sizes: ${{ matrix.name }}' > sizes-comment.md - echo '' >> sizes-comment.md - - if [ -f file-sizes.json.gz ]; then - gzip -d -c file-sizes.json.gz | grep $base_commit_sha > base-sizes.json || true - if [ -s base-sizes.json ]; then - uv run --no-project scripts/compare-file-sizes.py base-sizes.json head-sizes.json \ - >> sizes-comment.md - else - echo '_No baseline file sizes found for base commit._' >> sizes-comment.md - fi - else - echo '_No baseline file sizes available yet._' >> sizes-comment.md - fi - - cat sizes-comment.md >> $GITHUB_STEP_SUMMARY - - - name: Comment PR with file sizes - if: inputs.mode == 'pr' && matrix.remote_storage == null && github.event.pull_request.head.repo.fork == false - uses: thollander/actions-comment-pull-request@24bffb9b452ba05a4f3f77933840a6a841d1b32b # v3 - with: - file-path: sizes-comment.md - comment-tag: file-sizes-${{ matrix.id }} - - name: Comment PR on failure if: failure() && inputs.mode == 'pr' && github.event.pull_request.head.repo.fork == false uses: thollander/actions-comment-pull-request@24bffb9b452ba05a4f3f77933840a6a841d1b32b # v3 @@ -513,17 +475,6 @@ jobs: --benchmark-id "${{ matrix.id }}" \ --repo-url "${{ github.server_url }}/${{ github.repository }}" - - name: Upload File Sizes - if: inputs.mode == 'develop' && matrix.remote_storage == null - shell: bash - run: | - uv run --no-project scripts/capture-file-sizes.py \ - vortex-bench/data \ - --benchmark ${{ matrix.subcommand }} \ - --commit ${{ github.sha }} \ - -o sizes.json - bash scripts/cat-s3.sh vortex-ci-benchmark-results file-sizes-${{ matrix.id }}.json.gz sizes.json - - name: Alert incident.io if: failure() && inputs.mode == 'develop' uses: ./.github/actions/alert-incident-io diff --git a/scripts/capture-file-sizes.py b/scripts/capture-file-sizes.py index 754df1ee702..d923813db66 100644 --- a/scripts/capture-file-sizes.py +++ b/scripts/capture-file-sizes.py @@ -73,17 +73,28 @@ def main(): records.append( { + "metric": "file_size", + "unit": "bytes", + "value": size_bytes, "commit_id": args.commit, - "benchmark": args.benchmark, - "scale_factor": scale_factor, - "format": format_name, - "file": str(relative_path), - "size_bytes": size_bytes, + "file_size": { + "benchmark": args.benchmark, + "scale_factor": scale_factor, + "format": format_name, + "file": str(relative_path), + }, } ) # Sort for deterministic output - records.sort(key=lambda r: (r["benchmark"], r["scale_factor"], r["format"], r["file"])) + records.sort( + key=lambda r: ( + r["file_size"]["benchmark"], + r["file_size"]["scale_factor"], + r["file_size"]["format"], + r["file_size"]["file"], + ) + ) # Write JSONL output with open(args.output, "w") as f: diff --git a/scripts/compare-benchmark-jsons.py b/scripts/compare-benchmark-jsons.py index 44514053fad..59957d83153 100644 --- a/scripts/compare-benchmark-jsons.py +++ b/scripts/compare-benchmark-jsons.py @@ -14,6 +14,7 @@ import re import sys from dataclasses import dataclass +from io import StringIO from typing import Any import numpy as np @@ -38,6 +39,7 @@ # cutoff that is closer to a 99% two-sided interval before calling a change real. Z_SCORE_99 = 2.5758293035489004 CONTROL_FORMAT = "parquet" +FILE_SIZE_METRIC = "file_size" @dataclass @@ -63,6 +65,18 @@ def extract_dataset_key(df: pd.DataFrame) -> pd.DataFrame: return df +def split_file_size_rows(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: + """Split shared-stream file-size rows from benchmark timing rows.""" + + if df.empty: + return df.copy(), df.copy() + + metric = df["metric"] if "metric" in df.columns else pd.Series(pd.NA, index=df.index) + file_size = df["file_size"] if "file_size" in df.columns else pd.Series(pd.NA, index=df.index) + mask = metric.eq(FILE_SIZE_METRIC) | file_size.notna() + return df[mask].copy(), df[~mask].copy() + + def extract_target_fields(name: str) -> pd.Series: """Parse query, engine, and format from the benchmark name.""" @@ -360,6 +374,151 @@ def format_integer_value(value: float) -> str: return str(int(value)) +def format_size(size_bytes: int) -> str: + """Format bytes as a human-readable size.""" + + if size_bytes >= 1024**3: + return f"{size_bytes / (1024**3):.2f} GB" + if size_bytes >= 1024**2: + return f"{size_bytes / (1024**2):.2f} MB" + if size_bytes >= 1024: + return f"{size_bytes / 1024:.2f} KB" + return f"{size_bytes} B" + + +def format_size_change(change_bytes: int) -> str: + """Format a byte change with a sign.""" + + sign = "+" if change_bytes > 0 else "" + return f"{sign}{format_size(abs(change_bytes))}" + + +def format_pct_change(pct: float) -> str: + """Format a percentage change with a sign.""" + + sign = "+" if pct > 0 else "" + return f"{sign}{pct:.1f}%" + + +def extract_file_size_data(df: pd.DataFrame) -> dict[tuple[str, str, str, str], int]: + """Extract file-size rows keyed by benchmark, scale factor, format, and file.""" + + data = {} + if df.empty: + return data + + for _, row in df.iterrows(): + metadata = row.get("file_size") + if not isinstance(metadata, dict): + continue + + key = ( + str(metadata.get("benchmark", "")), + str(metadata.get("scale_factor", "1.0")), + str(metadata.get("format", "")), + str(metadata.get("file", "")), + ) + value = row.get("value") + if pd.isna(value): + continue + data[key] = int(value) + + return data + + +def format_file_size_report(base_rows: pd.DataFrame, pr_rows: pd.DataFrame) -> str: + """Render a shared-comment file-size comparison report.""" + + pr_data = extract_file_size_data(pr_rows) + if not pr_data: + return "" + + base_data = extract_file_size_data(base_rows) + if not base_data: + return "_No baseline file sizes found for base commit._" + + comparisons = [] + format_totals: dict[str, dict[str, int]] = {} + + for key in sorted(set(base_data) | set(pr_data)): + _benchmark, scale_factor, file_format, file_name = key + base_size = base_data.get(key, 0) + pr_size = pr_data.get(key, 0) + + totals = format_totals.setdefault(file_format, {"base": 0, "pr": 0}) + totals["base"] += base_size + totals["pr"] += pr_size + + change = pr_size - base_size + if change == 0: + continue + + if base_size > 0: + pct_change = (pr_size / base_size - 1) * 100 + elif pr_size > 0: + pct_change = float("inf") + else: + pct_change = 0.0 + + comparisons.append( + { + "file": file_name, + "scale_factor": scale_factor, + "format": file_format, + "base_size": base_size, + "pr_size": pr_size, + "change": change, + "pct_change": pct_change, + } + ) + + if not comparisons: + return "_No file size changes detected._" + + comparisons.sort(key=lambda comparison: comparison["pct_change"], reverse=True) + + total_base = sum(totals["base"] for totals in format_totals.values()) + total_pr = sum(totals["pr"] for totals in format_totals.values()) + overall_pct_str = "new" if total_base == 0 else format_pct_change((total_pr / total_base - 1) * 100) + increases = sum(1 for comparison in comparisons if comparison["change"] > 0) + decreases = sum(1 for comparison in comparisons if comparison["change"] < 0) + + output = StringIO() + print("
", file=output) + print( + f"File Size Changes ({len(comparisons)} files changed, " + f"{overall_pct_str} overall, {increases}↑ {decreases}↓)", + file=output, + ) + print("", file=output) + print("
", file=output) + print("", file=output) + print("| File | Scale | Format | Base | HEAD | Change | % |", file=output) + print("|------|-------|--------|------|------|--------|---|", file=output) + + for comparison in comparisons: + pct_str = "new" if comparison["pct_change"] == float("inf") else format_pct_change(comparison["pct_change"]) + base_str = format_size(comparison["base_size"]) if comparison["base_size"] > 0 else "-" + print( + f"| {comparison['file']} | {comparison['scale_factor']} | {comparison['format']} | {base_str} | " + f"{format_size(comparison['pr_size'])} | {format_size_change(comparison['change'])} | {pct_str} |", + file=output, + ) + + print("", file=output) + print("**Totals:**", file=output) + for file_format in sorted(format_totals): + totals = format_totals[file_format] + base_total = totals["base"] + pr_total = totals["pr"] + pct_str = "" if base_total == 0 else f" ({format_pct_change((pr_total / base_total - 1) * 100)})" + print(f"- {file_format}: {format_size(base_total)} → {format_size(pr_total)}{pct_str}", file=output) + + print("", file=output) + print("
", file=output) + return output.getvalue().rstrip() + + def format_name_with_highlight( name: str, ratio: float, improvement_threshold: float, regression_threshold: float ) -> str: @@ -445,6 +604,67 @@ def build_verdict(statistical_analysis: dict[str, Any]) -> dict[str, str] | None } +def build_within_engine_statistical_analyses(df: pd.DataFrame, threshold_pct: int) -> dict[str, dict[str, Any]]: + """Build an attribution model per engine, using that engine's own parquet rows as controls.""" + + analyses = {} + matched = df[df["engine"].notna() & (df["engine"] != "unknown")] + for engine, engine_df in matched.groupby("engine", sort=False): + if engine_df["file_format"].eq(CONTROL_FORMAT).sum() == 0: + continue + if (~engine_df["file_format"].eq(CONTROL_FORMAT)).sum() == 0: + continue + analysis = build_statistical_analysis(engine_df.copy(), threshold_pct) + if analysis is not None: + analyses[str(engine)] = analysis + return analyses + + +def format_within_engine_summary(analyses: dict[str, dict[str, Any]]) -> str | None: + """Render a compact summary of per-engine attributed changes.""" + + summaries = [] + for engine in sorted(analyses, key=lambda value: (ENGINE_ORDER.get(value, len(ENGINE_ORDER)), value)): + verdict = build_verdict(analyses[engine]) + if verdict is None: + continue + display_name = { + "datafusion": "DataFusion", + "duckdb": "DuckDB", + }.get(engine, engine) + summaries.append( + f"{display_name} {verdict['status']} ({verdict['impact']}, {verdict['confidence']} confidence)" + ) + + if not summaries: + return None + return " · ".join(summaries) + + +def format_report_help() -> str: + """Render explanatory markdown for the benchmark report headline fields.""" + + return "\n".join( + [ + "
", + "How to read Verdict and Engines", + "", + "
", + "", + "- **Verdict**: Overall PR-level signal after subtracting baseline drift " + "estimated from Parquet control rows. It can be `Likely improvement`, " + "`Likely regression`, or `No clear signal`.", + "- **Engines**: Per-engine attribution. DataFusion is compared against " + "DataFusion/Parquet controls; DuckDB is compared against DuckDB/Parquet " + "controls. This answers whether each engine improved or regressed independently.", + "- **Confidence**: Based on directional consistency, share of rows above " + "the noise floor, and control-run noise.", + "", + "
", + ] + ) + + ENGINE_ORDER = { "vortex": 0, "datafusion": 1, @@ -490,6 +710,9 @@ def main() -> None: base_commit_id = next(iter(base_commit_id)) pr_commit_id = next(iter(pr_commit_id)) + base_file_sizes, base = split_file_size_rows(base) + pr_file_sizes, pr = split_file_size_rows(pr) + if "storage" not in base: base["storage"] = pd.NA if "storage" not in pr: @@ -515,12 +738,16 @@ def main() -> None: statistical_analysis = build_statistical_analysis(df3, threshold_pct) verdict = build_verdict(statistical_analysis) if statistical_analysis is not None else None + engine_analyses = build_within_engine_statistical_analyses(df3, threshold_pct) + engine_summary = format_within_engine_summary(engine_analyses) summary_fields: list[str] = [] if verdict is not None: summary_fields.append(f"**Verdict**: {verdict['status']} ({verdict['confidence']} confidence)") summary_fields.append(f"**Attributed Vortex impact**: {verdict['impact']}") + if engine_summary is not None: + summary_fields.append(f"**Engines**: {engine_summary}") if len(vortex_df) > 0: vortex_performance = format_performance( @@ -549,6 +776,8 @@ def main() -> None: print("
".join(summary_fields)) print("") + print(format_report_help()) + print("") print("---") print("") @@ -609,6 +838,13 @@ def main() -> None: print("") print("") + file_size_report = format_file_size_report(base_file_sizes, pr_file_sizes) + if file_size_report: + print("") + print("---") + print("") + print(file_size_report) + if statistical_analysis is not None and not alpha_rows.empty: print("
") print("Full attributed analysis") diff --git a/scripts/tests/test_benchmark_reporting.py b/scripts/tests/test_benchmark_reporting.py new file mode 100644 index 00000000000..d22e8ffbb39 --- /dev/null +++ b/scripts/tests/test_benchmark_reporting.py @@ -0,0 +1,120 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +import importlib.util +import json +import subprocess +import sys +from pathlib import Path + +import pandas as pd + +REPO_ROOT = Path(__file__).resolve().parents[2] +COMPARE_SCRIPT = REPO_ROOT / "scripts" / "compare-benchmark-jsons.py" +CAPTURE_SCRIPT = REPO_ROOT / "scripts" / "capture-file-sizes.py" + + +def load_compare_module(): + spec = importlib.util.spec_from_file_location("compare_benchmark_jsons", COMPARE_SCRIPT) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +def timing_row(name: str, base: int, pr: int) -> dict[str, object]: + return { + "name": name, + "value_base": base, + "value_pr": pr, + "all_runtimes_base": [base, base, base], + "all_runtimes_pr": [pr, pr, pr], + } + + +def test_within_engine_analysis_uses_each_engines_own_parquet_control() -> None: + compare = load_compare_module() + rows = [ + timing_row("tpch_q01/datafusion:parquet", 100, 200), + timing_row("tpch_q01/datafusion:vortex-file-compressed", 100, 180), + timing_row("tpch_q01/duckdb:parquet", 100, 100), + timing_row("tpch_q01/duckdb:vortex-file-compressed", 100, 120), + ] + df = pd.DataFrame(rows) + df[["engine", "file_format", "query"]] = df["name"].apply(compare.extract_target_fields) + + analyses = compare.build_within_engine_statistical_analyses(df, threshold_pct=5) + + assert set(analyses) == {"datafusion", "duckdb"} + assert compare.build_verdict(analyses["datafusion"])["impact"] == "-10.0%" + assert compare.build_verdict(analyses["duckdb"])["impact"] == "+20.0%" + + +def file_size_record(commit: str, size: int) -> dict[str, object]: + return { + "metric": "file_size", + "unit": "bytes", + "value": size, + "commit_id": commit, + "file_size": { + "benchmark": "tpch", + "scale_factor": "10", + "format": "vortex-file-compressed", + "file": "part-0.vortex", + }, + } + + +def test_file_size_report_reads_shared_benchmark_rows() -> None: + compare = load_compare_module() + + report = compare.format_file_size_report( + pd.DataFrame([file_size_record("base-sha", 100)]), + pd.DataFrame([file_size_record("pr-sha", 125)]), + ) + + assert "File Size Changes (1 files changed, +25.0% overall, 1↑ 0↓)" in report + assert "| part-0.vortex | 10 | vortex-file-compressed | 100 B | 125 B | +25 B | +25.0% |" in report + + +def test_capture_file_sizes_emits_shared_benchmark_rows(tmp_path: Path) -> None: + data_dir = tmp_path / "data" + format_dir = data_dir / "tpch" / "10" / "vortex-file-compressed" + format_dir.mkdir(parents=True) + (format_dir / "part-0.vortex").write_bytes(b"x" * 42) + output_path = tmp_path / "sizes.jsonl" + + result = subprocess.run( + [ + sys.executable, + str(CAPTURE_SCRIPT), + str(data_dir), + "--benchmark", + "tpch", + "--commit", + "deadbeef", + "-o", + str(output_path), + ], + check=False, + capture_output=True, + text=True, + ) + + assert result.returncode == 0, result.stderr + records = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()] + assert records == [ + { + "metric": "file_size", + "unit": "bytes", + "value": 42, + "commit_id": "deadbeef", + "file_size": { + "benchmark": "tpch", + "scale_factor": "10", + "format": "vortex-file-compressed", + "file": "part-0.vortex", + }, + } + ]