From 2120539ca322bcf8a7580874a09dd421a8179588 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Tue, 2 Jun 2026 16:32:26 +0800 Subject: [PATCH 1/2] fix(index): reject user-specified index_uuid for segmented builds --- python/python/lance/dataset.py | 8 +- python/python/tests/test_indices.py | 24 ++- .../lance/benches/distributed_vector_build.rs | 192 ++++++------------ rust/lance/src/dataset/index.rs | 3 +- rust/lance/src/index/create.rs | 91 +++++++-- 5 files changed, 164 insertions(+), 154 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 147b5b4b9e3..ba82f02cb69 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3082,6 +3082,9 @@ def create_scalar_index( A UUID to use for the segment written by this call. If not provided, a new UUID will be generated. This parameter is passed via kwargs internally. + Specifying a UUID is only used by the legacy distributed scalar + build; index types on the segmented-index architecture (such as + vector indexes) generate segment UUIDs internally and reject it. progress_callback : callable, optional A callback that receives :class:`lance.progress.IndexProgress` events while the index is being built. @@ -3764,8 +3767,9 @@ def create_index( ``create_index_segment_builder().with_index_type(...).with_segments(...)`` and then committed with ``commit_existing_index_segments(...)``. index_uuid : str, optional - A UUID to use for the segment written by this call. - If not provided, a new UUID will be generated. + Not accepted for vector indexes: under the segmented-index + architecture Lance generates the segment UUID and returns it in the + resulting index metadata. Passing a value raises an error. progress_callback : callable, optional A callback that receives :class:`lance.progress.IndexProgress` events while the index is being built. diff --git a/python/python/tests/test_indices.py b/python/python/tests/test_indices.py index 7f6595f2ecc..f9e97613074 100644 --- a/python/python/tests/test_indices.py +++ b/python/python/tests/test_indices.py @@ -3,6 +3,7 @@ import math import os import pathlib +import uuid import lance import numpy as np @@ -273,15 +274,34 @@ def test_ivf_centroids_multivector_fragment_ids(tmpdir): metric="cosine", num_partitions=2, fragment_ids=fragment_ids, - index_uuid="00000000-0000-4000-8000-000000000001", ivf_centroids=centroids, ) - assert index.uuid == "00000000-0000-4000-8000-000000000001" + assert uuid.UUID(index.uuid) assert index.fragment_ids == set(fragment_ids) assert index.name == "embeddings_idx" +def test_create_index_uncommitted_rejects_user_index_uuid(tmpdir): + ds, dimension = make_multivector_dataset(tmpdir) + centroids = pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, 0.4, 0.8, 0.7, 0.6, 0.5], type=pa.float32()), + dimension, + ) + fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()] + + with pytest.raises(ValueError, match="index_uuid is no longer accepted"): + ds.create_index_uncommitted( + "embeddings", + index_type="IVF_HNSW_SQ", + metric="cosine", + num_partitions=2, + fragment_ids=fragment_ids, + index_uuid="00000000-0000-4000-8000-000000000001", + ivf_centroids=centroids, + ) + + def test_indices_builder_multivector_distributed_dimensions(tmpdir, monkeypatch): ds, dimension = make_multivector_dataset(tmpdir) builder = IndicesBuilder(ds, "embeddings") diff --git a/rust/lance/benches/distributed_vector_build.rs b/rust/lance/benches/distributed_vector_build.rs index 768abd5aba2..c2d166f8eb3 100644 --- a/rust/lance/benches/distributed_vector_build.rs +++ b/rust/lance/benches/distributed_vector_build.rs @@ -2,28 +2,28 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::fs; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; +use std::time::{Duration, Instant}; use arrow_array::{ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchIterator}; use arrow_array::{cast::AsArray, types::Float32Type}; use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema}; -use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; use serde::Serialize; -use uuid::Uuid; use lance::dataset::{Dataset, WriteMode, WriteParams}; use lance::index::{DatasetIndexExt, vector::VectorIndexParams}; use lance_arrow::FixedSizeListArrayExt; -use lance_index::progress::noop_progress; use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; use lance_index::{ IndexType, vector::{ivf::IvfBuildParams, pq::PQBuildParams}, }; use lance_linalg::distance::DistanceType; +use lance_table::format::IndexMetadata; use lance_testing::datagen::generate_random_array; use tokio::runtime::Runtime; @@ -34,6 +34,7 @@ const NUM_SUB_VECTORS: usize = 16; const NUM_BITS: usize = 8; const MAX_ITERS: usize = 20; const SAMPLE_RATE: usize = 8; +const AUXILIARY_FILE_NAME: &str = "auxiliary.idx"; #[derive(Clone, Copy, Debug)] struct BenchCase { @@ -52,9 +53,9 @@ impl BenchCase { #[derive(Clone, Debug)] struct MergeFixture { - index_dir: PathBuf, - partial_aux_bytes: u64, - partial_dir_count: usize, + segments: Vec, + segment_aux_bytes: u64, + segment_count: usize, } #[derive(Debug, Serialize)] @@ -62,9 +63,9 @@ struct CaseMetadata { label: String, num_shards: usize, num_partitions: usize, - partial_dir_count: usize, - partial_aux_bytes: u64, - partial_aux_bytes_per_shard: u64, + segment_count: usize, + segment_aux_bytes: u64, + segment_aux_bytes_per_shard: u64, total_rows: usize, rows_per_shard: usize, } @@ -124,22 +125,6 @@ fn bench_cases() -> [BenchCase; 6] { ] } -fn fixture_uuid(bench_case: BenchCase) -> Uuid { - Uuid::from_u128( - 0x733a_0000_0000_0000_0000_0000_0000_0000 - | ((bench_case.num_shards as u128) << 64) - | bench_case.num_partitions as u128, - ) -} - -fn working_uuid(bench_case: BenchCase) -> Uuid { - Uuid::from_u128( - 0x733b_0000_0000_0000_0000_0000_0000_0000 - | ((bench_case.num_shards as u128) << 64) - | bench_case.num_partitions as u128, - ) -} - fn create_batches() -> (Arc, Vec) { let schema = Arc::new(ArrowSchema::new(vec![Field::new( "vector", @@ -262,101 +247,41 @@ fn contiguous_fragment_groups(dataset: &Dataset, num_shards: usize) -> Vec MergeFixture { - let fixture_uuid = fixture_uuid(bench_case); - let index_dir = dataset_root() +/// Auxiliary-file size of an uncommitted segment on disk. +fn segment_auxiliary_bytes(segment: &IndexMetadata) -> u64 { + let aux_path = dataset_root() .join("_indices") - .join(fixture_uuid.to_string()); - - if has_partial_dirs(&index_dir) { - return MergeFixture { - partial_aux_bytes: sum_partial_auxiliary_bytes(&index_dir), - partial_dir_count: count_partial_dirs(&index_dir), - index_dir, - }; - } - - if index_dir.exists() { - fs::remove_dir_all(&index_dir).unwrap(); - } + .join(segment.uuid.to_string()) + .join(AUXILIARY_FILE_NAME); + fs::metadata(aux_path).map(|m| m.len()).unwrap_or(0) +} +async fn build_partial_fixture(dataset: &mut Dataset, bench_case: BenchCase) -> MergeFixture { let fragment_groups = contiguous_fragment_groups(dataset, bench_case.num_shards); let (ivf_params, pq_params) = Box::pin(train_shared_ivf_pq(dataset, bench_case.num_partitions)).await; let params = VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params); + // Build one uncommitted segment per shard. Segment UUIDs are generated by + // Lance; collect the returned metadata to feed the merge. + let mut segments = Vec::with_capacity(fragment_groups.len()); for fragments in fragment_groups { - let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, ¶ms); - builder = builder + let mut builder = dataset + .create_index_builder(&["vector"], IndexType::Vector, ¶ms) .name("distributed_merge_only".to_string()) - .fragments(fragments) - .index_uuid(fixture_uuid.to_string()); - Box::pin(builder.execute_uncommitted()).await.unwrap(); + .fragments(fragments); + let segment = Box::pin(builder.execute_uncommitted()).await.unwrap(); + segments.push(segment); } - MergeFixture { - partial_aux_bytes: sum_partial_auxiliary_bytes(&index_dir), - partial_dir_count: count_partial_dirs(&index_dir), - index_dir, - } -} + let segment_count = segments.len(); + let segment_aux_bytes = segments.iter().map(segment_auxiliary_bytes).sum(); -fn has_partial_dirs(index_dir: &Path) -> bool { - fs::read_dir(index_dir) - .ok() - .into_iter() - .flatten() - .flatten() - .any(|entry| { - entry.file_type().map(|t| t.is_dir()).unwrap_or(false) - && entry.file_name().to_string_lossy().starts_with("partial_") - }) -} - -fn count_partial_dirs(index_dir: &Path) -> usize { - fs::read_dir(index_dir) - .unwrap() - .flatten() - .filter(|entry| { - entry.file_type().map(|t| t.is_dir()).unwrap_or(false) - && entry.file_name().to_string_lossy().starts_with("partial_") - }) - .count() -} - -fn sum_partial_auxiliary_bytes(index_dir: &Path) -> u64 { - fs::read_dir(index_dir) - .unwrap() - .flatten() - .filter(|entry| { - entry.file_type().map(|t| t.is_dir()).unwrap_or(false) - && entry.file_name().to_string_lossy().starts_with("partial_") - }) - .map(|entry| entry.path().join("auxiliary.idx")) - .filter_map(|path| fs::metadata(path).ok()) - .map(|metadata| metadata.len()) - .sum() -} - -fn copy_dir_recursive(source: &Path, target: &Path) { - fs::create_dir_all(target).unwrap(); - for entry in fs::read_dir(source).unwrap().flatten() { - let source_path = entry.path(); - let target_path = target.join(entry.file_name()); - let file_type = entry.file_type().unwrap(); - if file_type.is_dir() { - copy_dir_recursive(&source_path, &target_path); - } else { - fs::copy(&source_path, &target_path).unwrap(); - } - } -} - -fn prepare_iteration_target(source: &Path, target: &Path) { - if target.exists() { - fs::remove_dir_all(target).unwrap(); + MergeFixture { + segments, + segment_aux_bytes, + segment_count, } - copy_dir_recursive(source, target); } fn write_case_metadata(fixtures: &[(BenchCase, MergeFixture)]) { @@ -368,10 +293,9 @@ fn write_case_metadata(fixtures: &[(BenchCase, MergeFixture)]) { label: bench_case.label(), num_shards: bench_case.num_shards, num_partitions: bench_case.num_partitions, - partial_dir_count: fixture.partial_dir_count, - partial_aux_bytes: fixture.partial_aux_bytes, - partial_aux_bytes_per_shard: fixture.partial_aux_bytes - / fixture.partial_dir_count as u64, + segment_count: fixture.segment_count, + segment_aux_bytes: fixture.segment_aux_bytes, + segment_aux_bytes_per_shard: fixture.segment_aux_bytes / fixture.segment_count as u64, total_rows: NUM_FRAGMENTS * ROWS_PER_FRAGMENT, rows_per_shard: (NUM_FRAGMENTS * ROWS_PER_FRAGMENT) / bench_case.num_shards, }) @@ -398,33 +322,33 @@ fn bench_distributed_merge_only(c: &mut Criterion) { group.sample_size(10); for (bench_case, fixture) in fixtures { - let target_uuid = working_uuid(bench_case); - let target_index_dir_fs = dataset_root() - .join("_indices") - .join(target_uuid.to_string()); - let source_index_dir_fs = fixture.index_dir.clone(); - - group.throughput(Throughput::Bytes(fixture.partial_aux_bytes)); + group.throughput(Throughput::Bytes(fixture.segment_aux_bytes)); group.bench_with_input( BenchmarkId::new("finalize_only", bench_case.label()), &bench_case, |b, _| { let dataset = dataset.clone(); - let target_index_dir_fs = target_index_dir_fs.clone(); - let source_index_dir_fs = source_index_dir_fs.clone(); - b.iter_batched( - || prepare_iteration_target(&source_index_dir_fs, &target_index_dir_fs), - |_| { - rt.block_on(dataset.merge_index_metadata( - &target_uuid.to_string(), - IndexType::IvfPq, - None, - noop_progress(), - )) - .unwrap(); - }, - BatchSize::PerIteration, - ); + let segments = fixture.segments.clone(); + // iter_custom so we can drop the merged output directory after each + // timed iteration; the merge generates a fresh UUID per call, which + // would otherwise accumulate on disk. + b.iter_custom(|iters| { + let mut total = Duration::ZERO; + for _ in 0..iters { + let segments = segments.clone(); + let start = Instant::now(); + let merged = rt + .block_on(dataset.merge_existing_index_segments(segments)) + .unwrap(); + total += start.elapsed(); + + let merged_dir = dataset_root() + .join("_indices") + .join(merged.uuid.to_string()); + let _ = fs::remove_dir_all(merged_dir); + } + total + }); }, ); } diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index 354cdaf7f86..e6e339d1286 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -218,15 +218,14 @@ mod tests { let target_fragments = fragments.iter().take(2).collect::>(); let params = VectorIndexParams::ivf_flat(2, MetricType::L2); - let first_segment_uuid = Uuid::new_v4(); let second_segment_uuid = Uuid::new_v4(); let built_index = dataset .create_index_builder(&["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) - .index_uuid(first_segment_uuid.to_string()) .execute_uncommitted() .await .unwrap(); + let first_segment_uuid = built_index.uuid; let first_segment_dir = dataset.indices_dir().join(first_segment_uuid.to_string()); let second_segment_dir = dataset.indices_dir().join(second_segment_uuid.to_string()); for file_name in ["index.idx", "auxiliary.idx"] { diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 842252f9a45..72969d6b49c 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -209,6 +209,17 @@ impl<'a> CreateIndexBuilder<'a> { ))); } + // Under the segmented-index architecture, segment UUIDs are generated by + // Lance so distributed workers never write to the same on-disk segment + // directory. + if self.index_uuid.is_some() && uses_segment_commit_path(self.index_type, self.params) { + return Err(Error::invalid_input(format!( + "index_uuid is no longer accepted for {} indexes; segment UUIDs are generated \ + by Lance and returned in the index metadata.", + self.index_type + ))); + } + let index_id = match &self.index_uuid { Some(uuid_str) => Uuid::parse_str(uuid_str) .map_err(|e| Error::index(format!("Invalid UUID string provided: {}", e)))?, @@ -479,7 +490,7 @@ impl<'a> CreateIndexBuilder<'a> { } else { vec![] }; - let transaction = if uses_segment_commit_path(self.index_type, &new_idx.name, self.params) { + let transaction = if uses_segment_commit_path(self.index_type, self.params) { let field_id = *new_idx.fields.first().ok_or_else(|| { Error::internal(format!( "Index '{}' is missing field ids after build", @@ -560,15 +571,7 @@ impl<'a> CreateIndexBuilder<'a> { } } -fn uses_segment_commit_path( - index_type: IndexType, - index_name: &str, - params: &dyn IndexParams, -) -> bool { - if index_name != LANCE_VECTOR_INDEX { - return false; - } - +fn uses_segment_commit_path(index_type: IndexType, params: &dyn IndexParams) -> bool { matches!( index_type, IndexType::Vector @@ -2085,19 +2088,20 @@ mod tests { .await .unwrap(); - let uuid = Uuid::new_v4(); let params = VectorIndexParams::ivf_hnsw( DistanceType::L2, prepare_vector_ivf(&dataset, "vector").await, HnswBuildParams::default(), ); - CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + // Vector indexes no longer accept a user-specified UUID; capture the + // segment UUID generated by Lance from the returned metadata. + let uuid = CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) - .index_uuid(uuid.to_string()) .execute_uncommitted() .await - .unwrap(); + .unwrap() + .uuid; dataset .commit_existing_index_segments( @@ -2122,6 +2126,65 @@ mod tests { ); } + #[tokio::test] + async fn test_vector_index_uuid_rejected() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(128), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + + // Full build with a pinned UUID is rejected. + let err = CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .index_uuid(Uuid::new_v4().to_string()) + .execute_uncommitted() + .await + .unwrap_err(); + assert!( + err.to_string().contains("index_uuid is no longer accepted"), + "unexpected error: {err}" + ); + + // Distributed (fragment-scoped) build with a pinned UUID is rejected too. + let fragment_id = dataset.get_fragments()[0].id() as u32; + let err = CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment_id]) + .index_uuid(Uuid::new_v4().to_string()) + .execute_uncommitted() + .await + .unwrap_err(); + assert!( + err.to_string().contains("index_uuid is no longer accepted"), + "unexpected error: {err}" + ); + } + #[tokio::test] async fn test_create_index_vector_commits_with_segment_metadata() { let tmpdir = TempStrDir::default(); From c44ba9909be349b3cfb7073af2043a536160da67 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Tue, 2 Jun 2026 16:48:38 +0800 Subject: [PATCH 2/2] fix(index): reject user-specified index_uuid for segmented builds --- rust/lance/benches/distributed_vector_build.rs | 3 --- rust/lance/src/index/create.rs | 5 ----- 2 files changed, 8 deletions(-) diff --git a/rust/lance/benches/distributed_vector_build.rs b/rust/lance/benches/distributed_vector_build.rs index c2d166f8eb3..569cd96daf7 100644 --- a/rust/lance/benches/distributed_vector_build.rs +++ b/rust/lance/benches/distributed_vector_build.rs @@ -329,9 +329,6 @@ fn bench_distributed_merge_only(c: &mut Criterion) { |b, _| { let dataset = dataset.clone(); let segments = fixture.segments.clone(); - // iter_custom so we can drop the merged output directory after each - // timed iteration; the merge generates a fresh UUID per call, which - // would otherwise accumulate on disk. b.iter_custom(|iters| { let mut total = Duration::ZERO; for _ in 0..iters { diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 72969d6b49c..6bb814c1fcc 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -209,9 +209,6 @@ impl<'a> CreateIndexBuilder<'a> { ))); } - // Under the segmented-index architecture, segment UUIDs are generated by - // Lance so distributed workers never write to the same on-disk segment - // directory. if self.index_uuid.is_some() && uses_segment_commit_path(self.index_type, self.params) { return Err(Error::invalid_input(format!( "index_uuid is no longer accepted for {} indexes; segment UUIDs are generated \ @@ -2094,8 +2091,6 @@ mod tests { HnswBuildParams::default(), ); - // Vector indexes no longer accept a user-specified UUID; capture the - // segment UUID generated by Lance from the returned metadata. let uuid = CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) .execute_uncommitted()