From 5d70dd0e23e049d0b6492df22c123cac7e08d707 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Wed, 15 Apr 2026 10:49:44 -0400 Subject: [PATCH] vector dataset catalog and downloader Signed-off-by: Connor Tsui --- vortex-bench/src/lib.rs | 1 + vortex-bench/src/vector_dataset/catalog.rs | 405 ++++++++++++++++++++ vortex-bench/src/vector_dataset/download.rs | 225 +++++++++++ vortex-bench/src/vector_dataset/layout.rs | 185 +++++++++ vortex-bench/src/vector_dataset/mod.rs | 38 ++ vortex-bench/src/vector_dataset/paths.rs | 116 ++++++ 6 files changed, 970 insertions(+) create mode 100644 vortex-bench/src/vector_dataset/catalog.rs create mode 100644 vortex-bench/src/vector_dataset/download.rs create mode 100644 vortex-bench/src/vector_dataset/layout.rs create mode 100644 vortex-bench/src/vector_dataset/mod.rs create mode 100644 vortex-bench/src/vector_dataset/paths.rs diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index af0d3fdef30..db9bd69c6ce 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -53,6 +53,7 @@ pub mod statpopgen; pub mod tpcds; pub mod tpch; pub mod utils; +pub mod vector_dataset; pub use benchmark::Benchmark; pub use benchmark::TableSpec; diff --git a/vortex-bench/src/vector_dataset/catalog.rs b/vortex-bench/src/vector_dataset/catalog.rs new file mode 100644 index 00000000000..5549add0c14 --- /dev/null +++ b/vortex-bench/src/vector_dataset/catalog.rs @@ -0,0 +1,405 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! The static catalog of hosted vector benchmark corpora. +//! +//! Every entry in [`ALL_VECTOR_DATASETS`] declares its dimensionality, row count, element type, +//! distance metric, the set of train-split layouts hosted on the upstream bucket, and whether +//! ground-truth `neighbors.parquet` and `scalar_labels` columns are available. +//! Most entries mirror VectorDBBench's dataset model; a few are extra prefixes that are present +//! on the same public bucket and use the same file layout. +//! +//! Higher-level code (downloaders, ingest pipelines, recall measurement) should be parameterized +//! over these descriptors rather than hardcoding per-dataset URLs. + +use std::num::NonZeroU32; + +use anyhow::Result; +use anyhow::bail; +use clap::ValueEnum; +use vortex::dtype::PType; + +use super::layout::LayoutSpec; +use super::layout::TrainLayout; +use super::layout::VectorMetric; + +const TEN_SHARDS: NonZeroU32 = NonZeroU32::new(10).unwrap(); +const FIFTY_SHARDS: NonZeroU32 = NonZeroU32::new(50).unwrap(); +const ONE_HUNDRED_SHARDS: NonZeroU32 = NonZeroU32::new(100).unwrap(); + +/// Every [`VectorDataset`] variant in catalog order. Useful for CLI help and metadata-consistency +/// tests. +pub const ALL_VECTOR_DATASETS: &[VectorDataset] = &[ + VectorDataset::CohereSmall100k, + VectorDataset::CohereMedium1m, + VectorDataset::CohereLarge10m, + VectorDataset::OpenaiSmall50k, + VectorDataset::OpenaiMedium500k, + VectorDataset::OpenaiLarge5m, + VectorDataset::BioasqMedium1m, + VectorDataset::BioasqLarge10m, + VectorDataset::GloveSmall100k, + VectorDataset::GloveMedium1m, + VectorDataset::GistSmall100k, + VectorDataset::GistMedium1m, + VectorDataset::SiftSmall500k, + VectorDataset::SiftMedium5m, + VectorDataset::SiftLarge50m, + VectorDataset::LaionLarge100m, +]; + +/// The publicly hosted vector benchmark datasets. +/// +/// Variants are named ``, kebab-cased on the CLI (e.g. `cohere-large-10m`). +/// +/// The static metadata for each variant (dimensionality, row count, hosted layouts, etc.) is +/// exposed via the inherent methods below; the full table is reachable via [`ALL_VECTOR_DATASETS`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ValueEnum)] +#[clap(rename_all = "kebab-case")] +pub enum VectorDataset { + /// Cohere wiki-22-12, 100K × 768 f32, cosine. Single + SingleShuffled. + CohereSmall100k, + /// Cohere wiki-22-12, 1M × 768 f32, cosine. Single + SingleShuffled. + CohereMedium1m, + /// Cohere wiki-22-12, 10M × 768 f32, cosine. Partitioned + PartitionedShuffled (10 shards). + CohereLarge10m, + + /// OpenAI embeddings on C4, 50K × 1536 f64, cosine. Single + SingleShuffled. + OpenaiSmall50k, + /// OpenAI embeddings on C4, 500K × 1536 f64, cosine. Single + SingleShuffled. + OpenaiMedium500k, + /// OpenAI embeddings on C4, 5M × 1536 f64, cosine. Partitioned + PartitionedShuffled (10 + /// shards). + OpenaiLarge5m, + + /// Bioasq biomedical, 1M × 1024 f32, cosine. SingleShuffled only. + BioasqMedium1m, + /// Bioasq biomedical, 10M × 1024 f32, cosine. PartitionedShuffled only (10 shards). + BioasqLarge10m, + + /// GloVe word vectors, 100K × 200 f32, cosine. Single only. No neighbors / labels. + GloveSmall100k, + /// GloVe word vectors, 1M × 200 f32, cosine. Single only. No neighbors / labels. + GloveMedium1m, + + /// GIST image features, 100K × 960 f32, L2. Single only. No neighbors / labels. + GistSmall100k, + /// GIST image features, 1M × 960 f32, L2. Single only. No neighbors / labels. + GistMedium1m, + + /// SIFT image features, 500K × 128 f32, L2. Single only. No neighbors / labels. + SiftSmall500k, + /// SIFT image features, 5M × 128 f32, L2. Single only. No neighbors / labels. + SiftMedium5m, + /// SIFT image features, 50M × 128 f32, L2. Partitioned only (50 shards). No labels. + SiftLarge50m, + + /// LAION image embeddings, 100M × 768 f32, L2. Partitioned only (100 shards). + /// Has `neighbors.parquet` and `scalar_labels.parquet`. + LaionLarge100m, +} + +impl VectorDataset { + /// Stable kebab-cased label used in CLI args, file paths, and metric names. + pub fn name(&self) -> &'static str { + match self { + VectorDataset::CohereSmall100k => "cohere-small-100k", + VectorDataset::CohereMedium1m => "cohere-medium-1m", + VectorDataset::CohereLarge10m => "cohere-large-10m", + VectorDataset::OpenaiSmall50k => "openai-small-50k", + VectorDataset::OpenaiMedium500k => "openai-medium-500k", + VectorDataset::OpenaiLarge5m => "openai-large-5m", + VectorDataset::BioasqMedium1m => "bioasq-medium-1m", + VectorDataset::BioasqLarge10m => "bioasq-large-10m", + VectorDataset::GloveSmall100k => "glove-small-100k", + VectorDataset::GloveMedium1m => "glove-medium-1m", + VectorDataset::GistSmall100k => "gist-small-100k", + VectorDataset::GistMedium1m => "gist-medium-1m", + VectorDataset::SiftSmall500k => "sift-small-500k", + VectorDataset::SiftMedium5m => "sift-medium-5m", + VectorDataset::SiftLarge50m => "sift-large-50m", + VectorDataset::LaionLarge100m => "laion-large-100m", + } + } + + /// The directory name on `assets.zilliz.com/benchmark//`. Snake-cased to + /// match the upstream bucket's existing naming convention. + pub fn s3_prefix(&self) -> &'static str { + match self { + VectorDataset::CohereSmall100k => "cohere_small_100k", + VectorDataset::CohereMedium1m => "cohere_medium_1m", + VectorDataset::CohereLarge10m => "cohere_large_10m", + VectorDataset::OpenaiSmall50k => "openai_small_50k", + VectorDataset::OpenaiMedium500k => "openai_medium_500k", + VectorDataset::OpenaiLarge5m => "openai_large_5m", + VectorDataset::BioasqMedium1m => "bioasq_medium_1m", + VectorDataset::BioasqLarge10m => "bioasq_large_10m", + VectorDataset::GloveSmall100k => "glove_small_100k", + VectorDataset::GloveMedium1m => "glove_medium_1m", + VectorDataset::GistSmall100k => "gist_small_100k", + VectorDataset::GistMedium1m => "gist_medium_1m", + VectorDataset::SiftSmall500k => "sift_small_500k", + VectorDataset::SiftMedium5m => "sift_medium_5m", + VectorDataset::SiftLarge50m => "sift_large_50m", + VectorDataset::LaionLarge100m => "laion_large_100m", + } + } + + /// Vector dimensionality. + pub fn dim(&self) -> u32 { + match self { + VectorDataset::CohereSmall100k + | VectorDataset::CohereMedium1m + | VectorDataset::CohereLarge10m + | VectorDataset::LaionLarge100m => 768, + VectorDataset::OpenaiSmall50k + | VectorDataset::OpenaiMedium500k + | VectorDataset::OpenaiLarge5m => 1536, + VectorDataset::BioasqMedium1m | VectorDataset::BioasqLarge10m => 1024, + VectorDataset::GloveSmall100k | VectorDataset::GloveMedium1m => 200, + VectorDataset::GistSmall100k | VectorDataset::GistMedium1m => 960, + VectorDataset::SiftSmall500k + | VectorDataset::SiftMedium5m + | VectorDataset::SiftLarge50m => 128, + } + } + + /// Number of rows in the train split (sum across shards if partitioned). + pub fn num_rows(&self) -> u64 { + match self { + VectorDataset::CohereSmall100k => 100_000, + VectorDataset::CohereMedium1m => 1_000_000, + VectorDataset::CohereLarge10m => 10_000_000, + VectorDataset::OpenaiSmall50k => 50_000, + VectorDataset::OpenaiMedium500k => 500_000, + VectorDataset::OpenaiLarge5m => 5_000_000, + VectorDataset::BioasqMedium1m => 1_000_000, + VectorDataset::BioasqLarge10m => 10_000_000, + VectorDataset::GloveSmall100k => 100_000, + VectorDataset::GloveMedium1m => 1_000_000, + VectorDataset::GistSmall100k => 100_000, + VectorDataset::GistMedium1m => 1_000_000, + VectorDataset::SiftSmall500k => 500_000, + VectorDataset::SiftMedium5m => 5_000_000, + VectorDataset::SiftLarge50m => 50_000_000, + VectorDataset::LaionLarge100m => 100_000_000, + } + } + + /// Element scalar type as stored in the upstream parquet. The benchmark always casts to + /// `f32` after ingest (TurboQuant + the handrolled baseline operate in f32), so this is + /// only consulted by the parquet readers. + pub fn element_ptype(&self) -> PType { + match self { + VectorDataset::OpenaiSmall50k + | VectorDataset::OpenaiMedium500k + | VectorDataset::OpenaiLarge5m => PType::F64, + _ => PType::F32, + } + } + + /// Distance metric the upstream dataset was curated for. + pub fn metric(&self) -> VectorMetric { + match self { + VectorDataset::GistSmall100k + | VectorDataset::GistMedium1m + | VectorDataset::SiftSmall500k + | VectorDataset::SiftMedium5m + | VectorDataset::SiftLarge50m + | VectorDataset::LaionLarge100m => VectorMetric::L2, + _ => VectorMetric::Cosine, + } + } + + /// The set of train-split layouts hosted on the upstream bucket for this dataset. + /// + /// Always non-empty since the catalog never declares a dataset with zero layouts. + pub fn layouts(&self) -> Vec { + match self { + VectorDataset::CohereSmall100k + | VectorDataset::CohereMedium1m + | VectorDataset::OpenaiSmall50k + | VectorDataset::OpenaiMedium500k => { + vec![LayoutSpec::single(), LayoutSpec::single_shuffled()] + } + VectorDataset::CohereLarge10m | VectorDataset::OpenaiLarge5m => vec![ + LayoutSpec::partitioned(TEN_SHARDS), + LayoutSpec::partitioned_shuffled(TEN_SHARDS), + ], + VectorDataset::BioasqMedium1m => vec![LayoutSpec::single_shuffled()], + VectorDataset::BioasqLarge10m => { + vec![LayoutSpec::partitioned_shuffled(TEN_SHARDS)] + } + VectorDataset::GloveSmall100k + | VectorDataset::GloveMedium1m + | VectorDataset::GistSmall100k + | VectorDataset::GistMedium1m + | VectorDataset::SiftSmall500k + | VectorDataset::SiftMedium5m => vec![LayoutSpec::single()], + VectorDataset::SiftLarge50m => vec![LayoutSpec::partitioned(FIFTY_SHARDS)], + VectorDataset::LaionLarge100m => vec![LayoutSpec::partitioned(ONE_HUNDRED_SHARDS)], + } + } + + /// Whether `neighbors.parquet` (top-K ground truth for recall) is hosted. + pub fn has_neighbors(&self) -> bool { + match self { + VectorDataset::CohereSmall100k + | VectorDataset::CohereMedium1m + | VectorDataset::CohereLarge10m + | VectorDataset::OpenaiSmall50k + | VectorDataset::OpenaiMedium500k + | VectorDataset::OpenaiLarge5m + | VectorDataset::BioasqMedium1m + | VectorDataset::BioasqLarge10m + | VectorDataset::LaionLarge100m => true, + VectorDataset::GloveSmall100k + | VectorDataset::GloveMedium1m + | VectorDataset::GistSmall100k + | VectorDataset::GistMedium1m + | VectorDataset::SiftSmall500k + | VectorDataset::SiftMedium5m + | VectorDataset::SiftLarge50m => false, + } + } + + /// Whether the train split carries a `scalar_labels` column (for filtered-search + /// benchmarks). The benchmark does not exercise filtered search yet, but the ingest + /// pipeline copies the column through when present so future filtered-recall work does + /// not require re-ingest. + pub fn has_scalar_labels(&self) -> bool { + matches!( + self, + VectorDataset::CohereSmall100k + | VectorDataset::CohereMedium1m + | VectorDataset::CohereLarge10m + | VectorDataset::OpenaiSmall50k + | VectorDataset::OpenaiMedium500k + | VectorDataset::OpenaiLarge5m + | VectorDataset::BioasqMedium1m + | VectorDataset::BioasqLarge10m + | VectorDataset::LaionLarge100m + ) + } + + /// Validate that `layout` is one of the layouts hosted for this dataset and return its + /// [`LayoutSpec`]. + /// + /// Bails with an error message that lists the allowed values. + pub fn validate_layout(&self, layout: TrainLayout) -> Result { + let layouts = self.layouts(); + match layouts.iter().find(|spec| spec.layout() == layout) { + Some(spec) => Ok(*spec), + None => { + let allowed = layouts + .iter() + .map(|s| s.layout().label()) + .collect::>() + .join(", "); + bail!( + "dataset {} does not have layout '{}'; allowed layouts: [{}]", + self.name(), + layout, + allowed, + ); + } + } + } + + /// Pick the default layout for this dataset — the first entry in [`Self::layouts`]. + /// Stable across runs since the catalog table is statically ordered. + pub fn default_layout(&self) -> LayoutSpec { + self.layouts()[0] + } +} + +#[cfg(test)] +mod tests { + use vortex::utils::aliases::hash_set::HashSet; + + use super::*; + + #[test] + fn all_datasets_have_consistent_metadata() { + let mut seen: HashSet<&'static str> = HashSet::default(); + for &ds in ALL_VECTOR_DATASETS { + assert!(seen.insert(ds.name()), "duplicate name {}", ds.name()); + assert!(ds.dim() > 0); + assert!(ds.num_rows() > 0); + assert!(!ds.layouts().is_empty(), "{} has no layouts", ds.name()); + assert_eq!( + ds.s3_prefix().chars().filter(|c| *c == '_').count() + 1, + ds.name().split('-').count(), + "{}: s3_prefix '{}' shape disagrees with name '{}'", + ds.name(), + ds.s3_prefix(), + ds.name(), + ); + } + } + + #[test] + fn validate_layout_accepts_hosted_layout() { + let ds = VectorDataset::CohereSmall100k; + let spec = ds.validate_layout(TrainLayout::Single).unwrap(); + assert_eq!(spec.layout(), TrainLayout::Single); + assert_eq!(spec.num_files(), 1); + } + + #[test] + fn validate_layout_rejects_unhosted_layout() { + let ds = VectorDataset::SiftSmall500k; + let err = ds + .validate_layout(TrainLayout::SingleShuffled) + .unwrap_err() + .to_string(); + assert!( + err.contains("does not have layout 'single-shuffled'"), + "{err}" + ); + assert!(err.contains("[single]"), "{err}"); + } + + #[test] + fn partitioned_datasets_declare_shard_count() { + let layouts = VectorDataset::CohereLarge10m.layouts(); + assert_eq!(layouts.len(), 2); + for spec in layouts { + assert!(spec.layout().is_partitioned()); + assert_eq!(spec.num_files(), 10); + } + } + + #[test] + fn l2_datasets_match_upstream_metric() { + for ds in [ + VectorDataset::GistSmall100k, + VectorDataset::GistMedium1m, + VectorDataset::SiftSmall500k, + VectorDataset::SiftMedium5m, + VectorDataset::SiftLarge50m, + VectorDataset::LaionLarge100m, + ] { + assert_eq!(ds.metric(), VectorMetric::L2, "{} should use L2", ds.name()); + } + } + + #[test] + fn datasets_without_neighbors_skip_recall() { + for ds in [ + VectorDataset::GloveSmall100k, + VectorDataset::GloveMedium1m, + VectorDataset::GistSmall100k, + VectorDataset::GistMedium1m, + VectorDataset::SiftSmall500k, + VectorDataset::SiftMedium5m, + VectorDataset::SiftLarge50m, + ] { + assert!( + !ds.has_neighbors(), + "{} unexpectedly has neighbors", + ds.name() + ); + } + } +} diff --git a/vortex-bench/src/vector_dataset/download.rs b/vortex-bench/src/vector_dataset/download.rs new file mode 100644 index 00000000000..da4b43262b6 --- /dev/null +++ b/vortex-bench/src/vector_dataset/download.rs @@ -0,0 +1,225 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! URL builders and idempotent download driver for vector benchmark datasets. +//! +//! The upstream bucket is `https://assets.zilliz.com/benchmark//`. Within each +//! prefix the train split is named according to a four-way convention: +//! +//! - `Single`: `train.parquet` +//! - `SingleShuffled`: `shuffle_train.parquet` +//! - `Partitioned`: `train-NN-of-MM.parquet` +//! - `PartitionedShuffled`: `shuffle_train-NN-of-MM.parquet` +//! +//! `test.parquet` and (when present) `neighbors.parquet` live alongside the train files. + +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use bytes::Bytes; +use futures::StreamExt; +use indicatif::ProgressBar; +use indicatif::ProgressStyle; +use reqwest::Client; +use reqwest::IntoUrl; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinSet; +use tracing::info; +use tracing::warn; + +use crate::datasets::data_downloads::download_data; +use crate::utils::file::idempotent_async; +use crate::vector_dataset::catalog::VectorDataset; +use crate::vector_dataset::layout::LayoutSpec; +use crate::vector_dataset::layout::TrainLayout; +use crate::vector_dataset::paths; + +/// Bucket root for all VectorDBBench datasets we mirror against. +const BENCHMARK_ROOT: &str = "https://assets.zilliz.com/benchmark"; + +/// All train-shard URLs for a `(dataset, layout)` pair. Length matches `layout.num_files()`. +pub fn train_urls(ds: VectorDataset, spec: LayoutSpec) -> Vec { + let prefix = format!("{BENCHMARK_ROOT}/{}", ds.s3_prefix()); + let layout = spec.layout(); + if layout.is_partitioned() { + let n = spec.num_files(); + (0..n) + .map(|i| format!("{prefix}/{}", partitioned_file_name(layout, i, n),)) + .collect() + } else { + let name = match layout { + TrainLayout::Single => "train.parquet", + TrainLayout::SingleShuffled => "shuffle_train.parquet", + _ => unreachable!("non-partitioned guard above"), + }; + vec![format!("{prefix}/{name}")] + } +} + +/// URL for `test.parquet`. +pub fn test_url(ds: VectorDataset) -> String { + format!("{BENCHMARK_ROOT}/{}/test.parquet", ds.s3_prefix()) +} + +/// URL for `neighbors.parquet`, or `None` when the dataset doesn't host one. +pub fn neighbors_url(ds: VectorDataset) -> Option { + ds.has_neighbors() + .then(|| format!("{BENCHMARK_ROOT}/{}/neighbors.parquet", ds.s3_prefix())) +} + +fn partitioned_file_name(layout: TrainLayout, shard_idx: u32, num_files: u32) -> String { + let prefix = match layout { + TrainLayout::Partitioned => "train", + TrainLayout::PartitionedShuffled => "shuffle_train", + _ => unreachable!("partitioned guard"), + }; + format!( + "{prefix}-{shard_idx:0width$}-of-{num_files:0width$}.parquet", + width = num_files_width(num_files), + ) +} + +fn num_files_width(num_files: u32) -> usize { + let digits = num_files.checked_ilog10().unwrap_or(0) as usize + 1; + digits.max(2) +} + +/// Local on-disk paths to the cached parquet files for a `(dataset, layout)` pair after +/// [`download`] returns successfully. +#[derive(Debug, Clone)] +pub struct DatasetPaths { + /// Per-shard train parquet paths in shard order. + pub train_files: Vec, + /// `test.parquet`. + pub test: PathBuf, + /// `neighbors.parquet` if the dataset hosts top-K ground truth. + pub neighbors: Option, +} + +/// Download every parquet file required to run a `(dataset, layout)` benchmark, returning local +/// on-disk paths. +/// +/// This has idempotent semantics, so files already present on disk are skipped, and re-runs only +/// pay for new files. +/// +/// Train shards download in parallel using a shared HTTP client; the small `test.parquet` and +/// `neighbors.parquet` files use the simple [`download_data`] helper. +pub async fn download(ds: VectorDataset, layout: TrainLayout) -> Result { + let spec = ds.validate_layout(layout)?; + let urls = train_urls(ds, spec); + let train_targets = paths::train_files(ds, layout, spec.num_files()); + debug_assert_eq!(urls.len(), train_targets.len()); + + let client = Client::builder() + .timeout(Duration::from_secs(60 * 60)) + .build() + .context("build reqwest client")?; + + let mut tasks: JoinSet> = JoinSet::new(); + for (url, target) in urls.into_iter().zip(train_targets.iter().cloned()) { + let client = client.clone(); + tasks.spawn(async move { + idempotent_async(target, |tmp| async move { + info!("downloading {}", url); + if spec.layout().is_partitioned() { + download_with_retry(&client, &url, &tmp).await?; + } else { + download_with_progress(&client, &url, &tmp).await?; + } + Ok(()) + }) + .await?; + Ok(()) + }); + } + while let Some(joined) = tasks.join_next().await { + joined.context("train download task panicked")??; + } + + let test = download_data(paths::test_path(ds, layout), &test_url(ds)) + .await + .with_context(|| format!("download test.parquet for {}", ds.name()))?; + + let neighbors = if let Some(url) = neighbors_url(ds) { + Some( + download_data(paths::neighbors_path(ds, layout), &url) + .await + .with_context(|| format!("download neighbors.parquet for {}", ds.name()))?, + ) + } else { + None + }; + + Ok(DatasetPaths { + train_files: train_targets, + test, + neighbors, + }) +} + +/// Stream a large file to disk with a byte-progress bar. +async fn download_with_progress(client: &Client, url: &str, output: &PathBuf) -> Result<()> { + let response = client + .get(url) + .send() + .await + .with_context(|| format!("GET {url}"))? + .error_for_status()?; + let total = response.content_length().unwrap_or(0); + + let progress = ProgressBar::new(total); + progress.set_style( + ProgressStyle::with_template( + "[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})", + ) + .expect("valid template"), + ); + + let mut file = File::create(output).await?; + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + file.write_all(&chunk).await?; + progress.inc(chunk.len() as u64); + } + progress.finish_and_clear(); + file.flush().await?; + Ok(()) +} + +/// Buffer-the-whole-body download with simple exponential backoff. Used for partitioned +/// shards because we already have download concurrency at the shard granularity. +async fn download_with_retry(client: &Client, url: &str, output: &PathBuf) -> Result<()> { + let body = retry_get(client, url).await?; + let mut file = File::create(output).await?; + file.write_all(&body).await?; + file.flush().await?; + Ok(()) +} + +async fn retry_get(client: &Client, url: impl IntoUrl + Clone) -> Result { + const MAX_ATTEMPTS: u32 = 4; + let mut last_err: Option = None; + for attempt in 0..MAX_ATTEMPTS { + let outcome: Result = async { + let resp = client.get(url.clone()).send().await?.error_for_status()?; + Ok(resp.bytes().await?) + } + .await; + match outcome { + Ok(b) => return Ok(b), + Err(e) => last_err = Some(e), + } + let backoff = Duration::from_secs(1u64 << attempt); + warn!( + "download attempt {} failed; retrying in {:?}", + attempt + 1, + backoff + ); + tokio::time::sleep(backoff).await; + } + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error"))) +} diff --git a/vortex-bench/src/vector_dataset/layout.rs b/vortex-bench/src/vector_dataset/layout.rs new file mode 100644 index 00000000000..3356776d0c3 --- /dev/null +++ b/vortex-bench/src/vector_dataset/layout.rs @@ -0,0 +1,185 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Train-split layout variants for vector benchmark datasets. +//! +//! VectorDBBench corpora are published under `assets.zilliz.com/benchmark//` in up to four +//! different shapes: +//! - a single train file +//! - a single shuffled-rows train file +//! - a partitioned train file split into N shards +//! - the same partitioned shape in shuffled-rows order. +//! +//! Not every dataset hosts every layout. See [`VectorDataset::layouts`] for the per-dataset list. +//! +//! [`VectorDataset::layouts`]: super::VectorDataset::layouts + +use std::fmt; +use std::num::NonZeroU32; + +use clap::ValueEnum; +use serde::Deserialize; +use serde::Serialize; + +/// Distance metric a dataset was curated for. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VectorMetric { + /// `dot(a, b) / (||a|| * ||b||)`. + Cosine, + /// `sum((a - b)^2)`. + L2, + // TODO(connor): Do we even need this? + /// `dot(a, b)`. + InnerProduct, +} + +/// A specific train layout published for a dataset, plus the shard count when partitioned +/// (always `1` for `Single` / `SingleShuffled`). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct LayoutSpec { + layout: TrainLayout, + num_files: NonZeroU32, +} + +impl LayoutSpec { + /// Build a single-file layout spec. + pub const fn single() -> Self { + Self { + layout: TrainLayout::Single, + num_files: NonZeroU32::MIN, + } + } + + /// Build a shuffled single-file layout spec. + pub const fn single_shuffled() -> Self { + Self { + layout: TrainLayout::SingleShuffled, + num_files: NonZeroU32::MIN, + } + } + + /// Build a partitioned layout spec with the given shard count. + pub const fn partitioned(num_files: NonZeroU32) -> Self { + Self { + layout: TrainLayout::Partitioned, + num_files, + } + } + + /// Build a shuffled partitioned layout spec with the given shard count. + pub const fn partitioned_shuffled(num_files: NonZeroU32) -> Self { + Self { + layout: TrainLayout::PartitionedShuffled, + num_files, + } + } + + /// Which of the four published shapes this entry describes. + pub const fn layout(&self) -> TrainLayout { + self.layout + } + + /// Number of parquet shards on the bucket. `1` for the single-file layouts. + pub const fn num_files(&self) -> u32 { + self.num_files.get() + } +} + +/// One of the four published train-split shapes for a VectorDBBench corpus. +/// +/// `Single` and `SingleShuffled` are one-file layouts; `Partitioned` and `PartitionedShuffled` are +/// sharded into N files. The shuffled variants randomize the row order, which is useful when you +/// want the on-disk arrangement to be representative of a query workload rather than of the +/// upstream ingest order. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ValueEnum, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum TrainLayout { + /// One `train.parquet` file. Row order matches the upstream curation. + #[clap(name = "single")] + Single, + /// One `shuffle_train.parquet` file. Row order is randomized. + #[clap(name = "single-shuffled")] + SingleShuffled, + /// Multiple `train-NN-of-N.parquet` shards. Row order matches the upstream curation. + #[clap(name = "partitioned")] + Partitioned, + /// Multiple `shuffle_train-NN-of-N.parquet` shards. Row order is randomized. + #[clap(name = "partitioned-shuffled")] + PartitionedShuffled, +} + +impl TrainLayout { + /// Stable kebab-cased label used in CLI args, file paths, and metric names. + pub fn label(&self) -> &'static str { + match self { + TrainLayout::Single => "single", + TrainLayout::SingleShuffled => "single-shuffled", + TrainLayout::Partitioned => "partitioned", + TrainLayout::PartitionedShuffled => "partitioned-shuffled", + } + } + + /// Whether this layout is split across multiple parquet files. + pub fn is_partitioned(&self) -> bool { + matches!( + self, + TrainLayout::Partitioned | TrainLayout::PartitionedShuffled + ) + } +} + +impl fmt::Display for TrainLayout { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.label()) + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU32; + + use super::*; + + #[test] + fn label_round_trips_through_value_enum() { + for layout in [ + TrainLayout::Single, + TrainLayout::SingleShuffled, + TrainLayout::Partitioned, + TrainLayout::PartitionedShuffled, + ] { + let parsed = TrainLayout::from_str(layout.label(), true).unwrap(); + assert_eq!(parsed, layout); + } + } + + #[test] + fn is_partitioned_matches_variant() { + assert!(!TrainLayout::Single.is_partitioned()); + assert!(!TrainLayout::SingleShuffled.is_partitioned()); + assert!(TrainLayout::Partitioned.is_partitioned()); + assert!(TrainLayout::PartitionedShuffled.is_partitioned()); + } + + #[test] + fn layout_specs_encode_valid_shape_and_count() { + assert_eq!(LayoutSpec::single().layout(), TrainLayout::Single); + assert_eq!(LayoutSpec::single().num_files(), 1); + assert_eq!( + LayoutSpec::single_shuffled().layout(), + TrainLayout::SingleShuffled + ); + assert_eq!( + LayoutSpec::partitioned(NonZeroU32::new(10).unwrap()).layout(), + TrainLayout::Partitioned + ); + assert_eq!( + LayoutSpec::partitioned_shuffled(NonZeroU32::new(10).unwrap()).layout(), + TrainLayout::PartitionedShuffled + ); + assert_eq!( + LayoutSpec::partitioned_shuffled(NonZeroU32::new(10).unwrap()).num_files(), + 10 + ); + } +} diff --git a/vortex-bench/src/vector_dataset/mod.rs b/vortex-bench/src/vector_dataset/mod.rs new file mode 100644 index 00000000000..fe1e42a68d0 --- /dev/null +++ b/vortex-bench/src/vector_dataset/mod.rs @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Public catalog of VectorDBBench corpora used by the vector-search benchmark. +//! +//! The catalog is intentionally separate from the [`crate::datasets::Dataset`] trait used by the +//! row-table benchmarks: the train split of a vector dataset is sometimes partitioned across many +//! parquet files, sometimes single-file, sometimes shuffled, sometimes not, and its `emb` column +//! has to be rewrapped into an `Extension` before it's useful to a cosine-similarity scan. +//! None of that fits the row-table `Dataset` contract. +//! +//! The four sub-modules split the catalog into roughly orthogonal concerns: +//! +//! - `catalog`: the static [`VectorDataset`] enum + per-dataset metadata. +//! - `layout`: [`TrainLayout`] / [`LayoutSpec`] (the four hosted train shapes) and +//! [`VectorMetric`]. +//! - `download`: URL builders and the idempotent download driver. +//! - `paths`: local filesystem layout (`/vector-search/...`). +//! +//! Higher-level callers (the bench crate's ingest + scan pipeline) compose these: +//! [`download::download`] returns a [`download::DatasetPaths`] handle that the ingest pass turns +//! into per-flavor `.vortex` files, after which the scan driver re-opens those files per iteration. + +mod catalog; +mod download; +mod layout; +mod paths; + +pub use catalog::ALL_VECTOR_DATASETS; +pub use catalog::VectorDataset; +pub use download::DatasetPaths; +pub use download::download; +pub use download::neighbors_url; +pub use download::test_url; +pub use download::train_urls; +pub use layout::LayoutSpec; +pub use layout::TrainLayout; +pub use layout::VectorMetric; diff --git a/vortex-bench/src/vector_dataset/paths.rs b/vortex-bench/src/vector_dataset/paths.rs new file mode 100644 index 00000000000..3fbf9452ff7 --- /dev/null +++ b/vortex-bench/src/vector_dataset/paths.rs @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Local-filesystem layout for cached vector benchmark datasets. +//! +//! ```text +//! /vector-search/// +//! train<> single-file: train.parquet +//! partitioned: 00-of-N.parquet, 01-of-N.parquet, ... +//! test.parquet +//! neighbors.parquet only when ds.has_neighbors() +//! +//! + some more things +//! ``` +//! +//! This module exists purely to centralize the path-construction logic used by both the downloader +//! and the ingest pipeline. + +use std::path::PathBuf; + +use crate::utils::file::data_dir; +use crate::vector_dataset::VectorDataset; +use crate::vector_dataset::layout::TrainLayout; + +/// Top-level cache root: `/vector-search/`. +pub fn root() -> PathBuf { + data_dir().join("vector-search") +} + +/// Per-dataset directory: `///`. +pub fn dataset_dir(ds: VectorDataset, layout: TrainLayout) -> PathBuf { + root().join(ds.name()).join(layout.label()) +} + +/// Train-shard directory: `/train/`. +pub fn train_dir(ds: VectorDataset, layout: TrainLayout) -> PathBuf { + dataset_dir(ds, layout).join("train") +} + +/// File name for one train shard within [`train_dir`]. +/// +/// Single-file layouts produce `train.parquet`; partitioned layouts produce `NN-of-MM.parquet` so a +/// directory listing sorts shards in sequence order. +pub fn train_file_name(layout: TrainLayout, shard_idx: u32, num_files: u32) -> String { + if layout.is_partitioned() { + format!( + "{shard_idx:0width$}-of-{num_files:0width$}.parquet", + width = num_files_width(num_files), + ) + } else { + "train.parquet".to_owned() + } +} + +/// All train-shard paths for a dataset/layout pair, in shard order. +pub fn train_files(ds: VectorDataset, layout: TrainLayout, num_files: u32) -> Vec { + let dir = train_dir(ds, layout); + (0..num_files) + .map(|i| dir.join(train_file_name(layout, i, num_files))) + .collect() +} + +/// Path to the cached `test.parquet` for a dataset/layout pair. +pub fn test_path(ds: VectorDataset, layout: TrainLayout) -> PathBuf { + dataset_dir(ds, layout).join("test.parquet") +} + +/// Path to the cached `neighbors.parquet` for a dataset/layout pair. +pub fn neighbors_path(ds: VectorDataset, layout: TrainLayout) -> PathBuf { + dataset_dir(ds, layout).join("neighbors.parquet") +} + +/// Width used to zero-pad shard indices in partitioned filenames. `10` shards is 2 digits, `100` +/// shards is 3 digits. +fn num_files_width(num_files: u32) -> usize { + let digits = num_files.checked_ilog10().unwrap_or(0) as usize + 1; + digits.max(2) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn single_layout_uses_train_parquet() { + assert_eq!(train_file_name(TrainLayout::Single, 0, 1), "train.parquet"); + } + + #[test] + fn partitioned_filename_zero_pads_to_match_total() { + assert_eq!( + train_file_name(TrainLayout::Partitioned, 0, 10), + "00-of-10.parquet" + ); + assert_eq!( + train_file_name(TrainLayout::Partitioned, 9, 10), + "09-of-10.parquet" + ); + assert_eq!( + train_file_name(TrainLayout::Partitioned, 99, 100), + "099-of-100.parquet" + ); + } + + #[test] + fn train_files_lists_all_shards_in_order() { + let files = train_files(VectorDataset::CohereLarge10m, TrainLayout::Partitioned, 10); + assert_eq!(files.len(), 10); + for (i, path) in files.iter().enumerate() { + assert!( + path.to_string_lossy() + .ends_with(&format!("{i:02}-of-10.parquet")) + ); + } + } +}