Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,12 @@ jobs:
enable-sccache: "false"
- run: cargo build --package vortex-jni

compat-check:
name: "Compat check"
uses: ./.github/workflows/compat-validation.yml
with:
mode: last

rust-publish-dry-run:
name: "Rust publish dry-run"
timeout-minutes: 120
Expand Down
40 changes: 26 additions & 14 deletions .github/workflows/compat-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,44 @@ name: Compat Validation
on:
schedule:
- cron: "0 6 * * 1" # Monday 6am UTC
workflow_call:
inputs:
mode:
description: "Validation mode"
required: true
default: "all"
type: string
workflow_dispatch:
inputs:
mode:
description: "Validation mode"
required: true
default: "last"
default: "all"
type: choice
options:
- last
- all

env:
FIXTURES_URL: https://vortex-compat-fixtures.s3.amazonaws.com
- last

jobs:
compat-test:
runs-on: ubuntu-latest
runs-on: >-
${{ github.repository == 'vortex-data/vortex'
&& format('runs-on={0}/runner=amd64-medium/image=ubuntu24-full-x64-pre/tag=compat-validation', github.run_id)
|| 'ubuntu-latest' }}
timeout-minutes: 120
steps:
- uses: actions/checkout@v4

- uses: dtolnay/rust-toolchain@stable

- uses: Swatinem/rust-cache@v2

- uses: runs-on/action@v2
if: github.repository == 'vortex-data/vortex'
with:
sccache: s3
- uses: actions/checkout@v6
- uses: ./.github/actions/setup-prebuild
- name: Install uv
uses: spiraldb/actions/.github/actions/setup-uv@0.18.5
with:
sync: false
- name: Run compat tests
run: |
MODE="${{ inputs.mode || 'last' }}"
python3 vortex-test/compat-gen/scripts/compat.py check \
MODE="${{ inputs.mode || 'all' }}"
uv run vortex-test/compat-gen/scripts/compat.py check \
--mode "$MODE"
141 changes: 108 additions & 33 deletions vortex-test/compat-gen/scripts/compat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors
# /// script
# dependencies = ["jsonschema"]
# ///

"""
Vortex backward-compatibility orchestrator.
Expand Down Expand Up @@ -79,6 +82,7 @@

# Check all versions, or specific ones
uv run compat.py check
uv run compat.py check --mode last
uv run compat.py check --versions 0.62.0,0.63.0

# Inspect store contents
Expand Down Expand Up @@ -341,7 +345,7 @@ def cmd_generate(args: argparse.Namespace) -> None:
output = Path(args.output)
version = _version_from_ref(args.git_ref)

_run_rust_generate(output)
_run_rust_generate(output, profile=args.profile)

# Read fixtures.json (with sha256 from Rust) and write a versioned manifest.
fixtures_json = json.loads((output / "fixtures.json").read_text())
Expand Down Expand Up @@ -387,7 +391,7 @@ def _publish_full(
output = Path(tmpdir) / "fixtures"

_info("generating fixtures...")
_run_rust_generate(output)
_run_rust_generate(output, profile=args.profile)

fixtures_json = json.loads((output / "fixtures.json").read_text())

Expand Down Expand Up @@ -472,7 +476,7 @@ def _publish_update(
output = Path(tmpdir) / "fixtures"

_info("generating fixtures...")
_run_rust_generate(output)
_run_rust_generate(output, profile=args.profile)

fixtures_json = json.loads((output / "fixtures.json").read_text())

Expand Down Expand Up @@ -541,10 +545,16 @@ def cmd_check(args: argparse.Namespace) -> None:
"""Download fixtures from store and check with Rust binary."""
store = _parse_store(args.store)

if args.versions and args.mode != "all":
print("error: --versions and --mode are mutually exclusive", file=sys.stderr)
sys.exit(1)

if args.versions:
versions = [v.strip() for v in args.versions.split(",")]
else:
versions = store.list_versions()
if args.mode == "last" and versions:
versions = versions[-1:]

if not versions:
_info("no versions found in store")
Expand Down Expand Up @@ -573,18 +583,15 @@ def cmd_check(args: argparse.Namespace) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
tmppath = Path(tmpdir)

for entry in manifest["fixtures"]:
name = entry["name"]
data = store.read(f"{prefix}/{name}")
if data is None:
_info(f" v{version}: {name} not found at {prefix}/{name}")
all_failures.append((version, name, "fixture file not found in store"))
total_failed += 1
continue
(tmppath / name).write_bytes(data)
_info(f" downloaded {name} ({len(data)} bytes)")

result = _run_rust_check(tmppath, mode="subset")
_info(f" downloading {len(manifest['fixtures'])} fixtures...")
download_failures = _parallel_download(store, manifest["fixtures"], prefix, tmppath)
for name, error in download_failures:
_info(f" v{version}: {name} {error}")
all_failures.append((version, name, error))
total_failed += 1

_info(f" checking v{version}...")
result = _run_rust_check(tmppath, mode="subset", profile=args.profile)

passed = len(result.get("passed", []))
failed_list = result.get("failed", [])
Expand Down Expand Up @@ -742,18 +749,78 @@ def _parallel_upload(store: Store, items: list[tuple[str, Path]], max_workers: i
future.result()


def _run_rust_generate(output: Path) -> None:
def _parallel_download(
store: Store,
fixtures: list[dict],
prefix: str,
dest: Path,
max_workers: int = 8,
) -> list[tuple[str, str]]:
"""Download fixture files from the store in parallel.

Returns a list of (name, error) for any failures.
"""
failures: list[tuple[str, str]] = []
total_bytes = 0

def _download_one(entry: dict) -> tuple[str, bytes | None]:
name = entry["name"]
data = store.read(f"{prefix}/{name}")
return name, data

with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(_download_one, entry): entry["name"] for entry in fixtures}
for future in as_completed(futures):
name, data = future.result()
if data is None:
failures.append((name, f"not found at {prefix}/{name}"))
else:
(dest / name).write_bytes(data)
total_bytes += len(data)

_info(f" downloaded {len(fixtures) - len(failures)} fixtures ({total_bytes} bytes)")
return failures


def _build_compat_bin(profile: str = "release") -> str:
"""Build vortex-compat and return the path to the binary.

If VORTEX_COMPAT_BIN is set, skips the build and returns that path.
Otherwise runs `cargo build` with visible output, then locates the binary.
"""
bin_path = os.environ.get("VORTEX_COMPAT_BIN")
if bin_path:
return bin_path

_info(f"building vortex-compat ({profile})...")
_run_cmd(["cargo", "build", "-p", CARGO_BIN, "--profile", profile], check=True)

# Ask cargo where the binary is.
result = subprocess.run(
["cargo", "metadata", "--format-version=1", "--no-deps"],
capture_output=True,
text=True,
check=True,
)
target_dir = json.loads(result.stdout)["target_directory"]
# Cargo puts "dev" profile binaries in "debug/", all others in "<profile>/".
dir_name = "debug" if profile == "dev" else profile
bin_path = str(Path(target_dir) / dir_name / CARGO_BIN)
return bin_path


def _run_rust_generate(output: Path, profile: str = "release") -> None:
"""Run `vortex-compat generate --output <dir>`."""
cmd = _cargo_run_cmd() + ["generate", "--output", str(output)]
_run_cmd(cmd, check=True)
bin_path = _build_compat_bin(profile)
_run_cmd([bin_path, "generate", "--output", str(output)], check=True)


def _run_rust_check(dir: Path, mode: str = "subset") -> dict:
def _run_rust_check(dir: Path, mode: str = "subset", profile: str = "release") -> dict:
"""Run `vortex-compat check --dir <dir> --mode <mode>` and parse JSON stdout."""
cmd = _cargo_run_cmd() + ["check", "--dir", str(dir), "--mode", mode]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stderr:
print(result.stderr, end="", file=sys.stderr)
bin_path = _build_compat_bin(profile)
cmd = [bin_path, "check", "--dir", str(dir), "--mode", mode]
_info(f" $ {' '.join(cmd)}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=None, text=True) # noqa: UP022

if result.stdout.strip():
return json.loads(result.stdout)
Expand All @@ -767,17 +834,12 @@ def _run_rust_check(dir: Path, mode: str = "subset") -> dict:
return {"passed": [], "failed": [], "skipped": []}


def _cargo_run_cmd() -> list[str]:
"""Build the command to invoke vortex-compat (pre-built binary or cargo run)."""
bin_path = os.environ.get("VORTEX_COMPAT_BIN")
if bin_path:
return [bin_path]
return ["cargo", "run", "-p", CARGO_BIN, "--release", "--"]


def _run_cmd(cmd: list[str], check: bool = False, cwd: Path | None = None) -> subprocess.CompletedProcess:
_info(f" $ {' '.join(cmd)}")
return subprocess.run(cmd, check=check, cwd=cwd)
result = subprocess.run(cmd, check=False, cwd=cwd)
if check and result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, cmd)
return result


def _find_prev_version(versions: list[str], current: str) -> str | None:
Expand Down Expand Up @@ -816,6 +878,11 @@ def main() -> None:
epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--profile",
default="release",
help="Cargo build profile (default: release). Use 'dev' for faster builds.",
)
sub = parser.add_subparsers(dest="command", metavar="COMMAND")

# -- generate --
Expand Down Expand Up @@ -902,6 +969,7 @@ def main() -> None:
epilog=(
"examples:\n"
" uv run compat.py check\n"
" uv run compat.py check --mode last\n"
" uv run compat.py check --versions 0.62.0,0.63.0\n"
" uv run compat.py check --store /tmp/store"
),
Expand All @@ -910,7 +978,14 @@ def main() -> None:
p.add_argument("--store", default=DEFAULT_STORE, help="Store spec (default: %(default)s)")
p.add_argument(
"--versions",
help="Comma-separated versions to check (default: all)",
help="Comma-separated versions to check (mutually exclusive with --mode)",
)
p.add_argument(
"--mode",
choices=["all", "last"],
default="all",
help="Which versions to check: 'all' (default) or 'last' (most recent only). "
"Mutually exclusive with --versions.",
)

# -- list --
Expand Down
38 changes: 38 additions & 0 deletions vortex-test/compat-gen/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use vortex::file::OpenOptionsSessionExt;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::session::RuntimeSessionExt;
use vortex::layout::LayoutStrategy;
use vortex::layout::layouts::flat::Flat;
use vortex::layout::layouts::flat::writer::FlatLayoutStrategy;
use vortex_array::ArrayRef;
use vortex_array::ArrayVisitorExt;
use vortex_array::DynArray;
use vortex_array::MaskFuture;
use vortex_array::expr::root;
use vortex_array::expr::stats::Stat;
use vortex_array::stream::ArrayStreamAdapter;
use vortex_array::stream::ArrayStreamExt;
Expand Down Expand Up @@ -108,3 +111,38 @@ pub fn read_file(bytes: ByteBuffer) -> VortexResult<ArrayRef> {
file.scan()?.into_array_stream()?.read_all().await
})
}

/// Open a `.vortex` file and fully decode every array in the layout tree, including
/// auxiliary data like zone maps and dictionaries.
///
/// Walks the entire layout tree and for each leaf `FlatLayout`, reads the segment
/// and calls `ArrayParts::decode()` to fully deserialize the array. This exercises
/// every segment in the file — not just the data path that a plain `scan()` touches.
/// If any segment is corrupt or any array fails to decode, this will error.
pub fn read_layout_tree(bytes: ByteBuffer) -> VortexResult<()> {
runtime()?.block_on(async {
let session = VortexSession::default().with_tokio();
let file = session.open_options().open_buffer(bytes)?;
let root_layout = file.footer().layout().clone();
let segment_source = file.segment_source();

for layout_result in root_layout.depth_first_traversal() {
let layout = layout_result?;
if layout.as_opt::<Flat>().is_none() {
continue;
}
let row_count = layout.row_count();
if row_count == 0 {
continue;
}
let reader = layout.new_reader("".into(), segment_source.clone(), &session)?;
let len =
usize::try_from(row_count).map_err(|e| vortex_err!("row count overflow: {e}"))?;
reader
.projection_evaluation(&(0..row_count), &root(), MaskFuture::new_true(len))?
.await?;
}

Ok(())
})
}
Loading
Loading