diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 79a41b4571b..8354f80416c 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -1,8 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::cmp::Ordering; use std::collections::HashSet; -use std::future; use std::sync::Arc; use std::{collections::HashMap, pin::Pin}; @@ -80,7 +80,7 @@ use crate::Dataset; use crate::dataset::ProjectionRequest; use crate::dataset::index::dataset_format_version; use crate::index::vector::ivf::v2::PartitionEntry; -use crate::index::vector::utils::{infer_vector_dim, infer_vector_element_type}; +use crate::index::vector::utils::infer_vector_dim; use super::v2::IVFIndex; use super::{ @@ -764,53 +764,52 @@ impl IvfIndexBuilder ) }; - let (assign_batches, merge_indices, partition_adjustment) = if num_indices_to_merge - .is_some() - || self.optimize_options.is_none() - { - no_partition_adjustment() - } else { - match Self::check_partition_adjustment(ivf, reader.as_ref(), &self.existing_indices)? { - Some(partition_adjustment) => match partition_adjustment { - PartitionAdjustment::Split(partition) => { - // Perform split and record the fact for downstream build/merge - log::info!( - "split partition {}, will merge all {} delta indices", - partition, - self.existing_indices.len() - ); - let split_results = self.split_partition(partition, ivf).await?; - let Some(ivf) = self.ivf.as_mut() else { - return Err(Error::invalid_input( - "IVF not set before building partitions", - )); - }; - ivf.centroids = Some(split_results.new_centroids); - ( - split_results.assign_batches, - Arc::new(self.existing_indices.clone()), - Some(partition_adjustment), - ) - } - PartitionAdjustment::Join(partition) => { - log::info!("join partition {}", partition); - let results = self.join_partition(partition, ivf).await?; - let Some(ivf) = self.ivf.as_mut() else { - return Err(Error::invalid_input( - "IVF model not set before joining partition", - )); - }; - ivf.centroids = Some(results.new_centroids); - ( - results.assign_batches, - Arc::new(self.existing_indices.clone()), - Some(partition_adjustment), - ) + let (assign_batches, merge_indices, partition_adjustment) = + if num_indices_to_merge.is_some() || self.optimize_options.is_none() { + no_partition_adjustment() + } else { + let (split_partitions, join_partition) = + Self::check_partition_adjustment(ivf, reader.as_ref(), &self.existing_indices)?; + if !split_partitions.is_empty() { + log::info!( + "split partitions {:?}, will merge all {} delta indices", + split_partitions, + self.existing_indices.len() + ); + let split_results = self.split_partitions(&split_partitions, ivf).await?; + let actual_split_partitions = split_results.split_partitions.clone(); + let Some(ivf) = self.ivf.as_mut() else { + return Err(Error::invalid_input( + "IVF not set before building partitions", + )); + }; + ivf.centroids = Some(split_results.new_centroids); + ( + split_results.assign_batches, + Arc::new(self.existing_indices.clone()), + Some(PartitionAdjustment::Split(actual_split_partitions)), + ) + } else { + match join_partition { + Some(partition) => { + log::info!("join partition {}", partition); + let results = self.join_partition(partition, ivf).await?; + let Some(ivf) = self.ivf.as_mut() else { + return Err(Error::invalid_input( + "IVF model not set before joining partition", + )); + }; + ivf.centroids = Some(results.new_centroids); + ( + results.assign_batches, + Arc::new(self.existing_indices.clone()), + Some(PartitionAdjustment::Join(partition)), + ) + } + None => no_partition_adjustment(), } - }, - None => no_partition_adjustment(), - } - }; + } + }; self.merged_num = merge_indices.len(); log::info!( "merge {}/{} delta indices", @@ -833,11 +832,15 @@ impl IvfIndexBuilder let sub_index_params = sub_index_params.clone(); let column = column.clone(); let frag_reuse_index = frag_reuse_index.clone(); - let skip_existing_batches = - partition_adjustment == Some(PartitionAdjustment::Split(partition)); - let partition = match partition_adjustment { + let skip_existing_batches = match &partition_adjustment { + Some(PartitionAdjustment::Split(split_partitions)) => { + split_partitions.binary_search(&partition).is_ok() + } + _ => false, + }; + let partition = match &partition_adjustment { Some(PartitionAdjustment::Join(joined_partition)) - if partition >= joined_partition => + if partition >= *joined_partition => { partition + 1 } @@ -1286,25 +1289,21 @@ impl IvfIndexBuilder ivf: &IvfModel, reader: &dyn ShuffleReader, existing_indices: &[Arc], - ) -> Result> { + ) -> Result<(Vec, Option)> { let index_type = IndexType::try_from( index_type_string(S::name().try_into()?, Q::quantization_type()).as_str(), )?; - let mut split_partition = None; + let mut split_partitions = Vec::new(); let mut join_partition = None; - let mut max_partition_size = 0; let mut min_partition_size = usize::MAX; for partition in 0..ivf.num_partitions() { let mut num_rows = reader.partition_size(partition)?; for index in existing_indices.iter() { num_rows += index.partition_size(partition); } - if num_rows > max_partition_size - && num_rows > MAX_PARTITION_SIZE_FACTOR * index_type.target_partition_size() - { - max_partition_size = num_rows; - split_partition = Some(partition); + if num_rows > MAX_PARTITION_SIZE_FACTOR * index_type.target_partition_size() { + split_partitions.push(partition); } if ivf.num_partitions() > 1 && num_rows < min_partition_size @@ -1315,44 +1314,36 @@ impl IvfIndexBuilder } } - if let Some(partition) = split_partition { - Ok(Some(PartitionAdjustment::Split(partition))) - } else if let Some(partition) = join_partition { - Ok(Some(PartitionAdjustment::Join(partition))) - } else { - Ok(None) - } + Ok((split_partitions, join_partition)) } - // split this partition, - // 1. take raw vectors by row ids in this partition - // 2. run KMeans with k=2 to get 2 new centroids - // 3. reassign the vectors to the 2 new partitions - async fn split_partition(&self, part_idx: usize, ivf: &IvfModel) -> Result { - // take the raw vectors from dataset - let Some((row_ids, vectors)) = self.load_partition_raw_vectors(part_idx).await? else { - return Ok(AssignResult { - assign_batches: vec![None; ivf.num_partitions()], - new_centroids: ivf.centroids_array().unwrap().clone(), - }); + async fn split_partitions( + &self, + split_partitions: &[usize], + ivf: &IvfModel, + ) -> Result { + let Some(dataset) = self.dataset.as_ref() else { + return Err(Error::invalid_input( + "dataset not set before split partition", + )); }; - let element_type = infer_vector_element_type(vectors.data_type())?; + let (_, element_type) = get_vector_type(dataset.schema(), &self.column)?; match element_type { DataType::Float16 => { - self.split_partition_impl::(part_idx, ivf, &row_ids, &vectors) + self.split_partitions_impl::(split_partitions, ivf) .await } DataType::Float32 => { - self.split_partition_impl::(part_idx, ivf, &row_ids, &vectors) + self.split_partitions_impl::(split_partitions, ivf) .await } DataType::Float64 => { - self.split_partition_impl::(part_idx, ivf, &row_ids, &vectors) + self.split_partitions_impl::(split_partitions, ivf) .await } DataType::UInt8 => { - self.split_partition_impl::(part_idx, ivf, &row_ids, &vectors) + self.split_partitions_impl::(split_partitions, ivf) .await } dt => Err(Error::invalid_input(format!( @@ -1362,26 +1353,101 @@ impl IvfIndexBuilder } } - async fn split_partition_impl( + async fn split_partitions_impl( &self, - part_idx: usize, + split_partitions: &[usize], ivf: &IvfModel, - row_ids: &UInt64Array, - vectors: &FixedSizeListArray, ) -> Result where T::Native: Dot + L2 + Normalize, PrimitiveArray: From>, { let centroids = ivf.centroids_array().unwrap(); - let mut new_centroids: Vec = Vec::with_capacity(ivf.num_partitions() + 1); + let mut new_centroids: Vec = + Vec::with_capacity(ivf.num_partitions() + split_partitions.len()); new_centroids.extend(centroids.iter().map(|vec| vec.unwrap())); + let split_plans = stream::iter(split_partitions.iter().copied().enumerate()) + .map(|(split_order, part_idx)| async move { + let centroid2_part_idx = ivf.num_partitions() + split_order; + self.build_split_plan::(part_idx, centroid2_part_idx, ivf) + .await + }) + .buffered(get_num_compute_intensive_cpus()) + .try_collect::>() + .await?; + let mut split_plans = split_plans.into_iter().flatten().collect::>(); + split_plans.sort_by_key(|plan| plan.part_idx); + Self::finalize_split_plans(&mut split_plans, ivf.num_partitions()); + + if split_plans.is_empty() { + return Ok(AssignResult { + assign_batches: vec![None; ivf.num_partitions()], + new_centroids: centroids.clone(), + split_partitions: Vec::new(), + }); + } + + for split_plan in &split_plans { + new_centroids[split_plan.part_idx] = split_plan.centroid1.clone(); + } + new_centroids.extend(split_plans.iter().map(|plan| plan.centroid2.clone())); + + let split_partition_set = + HashSet::::from_iter(split_plans.iter().map(|plan| plan.part_idx)); + let new_centroids = new_centroids + .iter() + .map(|vec| vec.as_ref()) + .collect::>(); + let new_centroids = arrow::compute::concat(&new_centroids)?; + let new_centroids = + FixedSizeListArray::try_new_from_values(new_centroids, centroids.value_length())?; + let mut assign_ops = vec![Vec::new(); new_centroids.len()]; + + for split_plan in &split_plans { + for (target_idx, op) in &split_plan.original_assign_ops { + assign_ops[*target_idx].push(op.clone()); + } + } + + for candidate_move in self + .collect_candidate_moves::(&split_plans, &split_partition_set, ivf) + .await? + { + assign_ops[candidate_move.source_part_idx] + .push(AssignOp::Remove(candidate_move.row_id)); + assign_ops[candidate_move.dest_part_idx].push(AssignOp::Add(( + candidate_move.row_id, + candidate_move.vector, + ))); + } + + let assign_batches = self.build_assign_batch::(&new_centroids, &assign_ops)?; + + Ok(AssignResult { + assign_batches, + new_centroids, + split_partitions: split_plans.iter().map(|plan| plan.part_idx).collect(), + }) + } + + async fn build_split_plan( + &self, + part_idx: usize, + centroid2_part_idx: usize, + ivf: &IvfModel, + ) -> Result> + where + T::Native: Dot + L2 + Normalize, + PrimitiveArray: From>, + { + let Some((row_ids, vectors)) = self.load_partition_raw_vectors(part_idx).await? else { + return Ok(None); + }; let dimension = infer_vector_dim(vectors.data_type())?; - // train kmeans to get 2 new centroids let (normalized_dist_type, normalized_vectors) = match self.distance_type { DistanceType::Cosine => { - let vectors = normalize_fsl(vectors)?; + let vectors = normalize_fsl(&vectors)?; (DistanceType::L2, vectors) } _ => (self.distance_type, vectors.clone()), @@ -1394,121 +1460,132 @@ impl IvfIndexBuilder 2, 256, )?; - // the original centroid + let c0 = ivf .centroid(part_idx) .ok_or(Error::invalid_input("original centroid not found"))?; - // the 2 new centroids - let c1 = kmeans.centroids.slice(0, dimension); - let c2 = kmeans.centroids.slice(dimension, dimension); - // replace the original centroid with the first new one - new_centroids[part_idx] = c1.clone(); - // append the second new one - new_centroids.push(c2.clone()); - let centroid1_part_idx = part_idx; - let centroid2_part_idx = new_centroids.len() - 1; - - let new_centroids = new_centroids - .iter() - .map(|vec| vec.as_ref()) - .collect::>(); - let new_centroids = arrow::compute::concat(&new_centroids)?; - - // get top REASSIGN_RANGE centroids from c0 + let centroid1 = kmeans.centroids.slice(0, dimension); + let centroid2 = kmeans.centroids.slice(dimension, dimension); let (reassign_part_ids, reassign_part_centroids) = self.select_reassign_candidates(ivf, part_idx, &c0)?; - // compute the distance between the vectors and the 3 centroids (original one and the 2 new ones) - let d0 = self.distance_type.arrow_batch_func()(&c0, vectors)?; - let d1 = self.distance_type.arrow_batch_func()(&c1, vectors)?; - let d2 = self.distance_type.arrow_batch_func()(&c2, vectors)?; - let d0 = d0.values(); - let d1 = d1.values(); - let d2 = d2.values(); - - let mut assign_ops = vec![Vec::new(); ivf.num_partitions() + 1]; - // assign the vectors in the original partition - self.assign_vectors::( + let d0 = self.distance_type.arrow_batch_func()(&c0, &vectors)?; + let d1 = self.distance_type.arrow_batch_func()(centroid1.as_ref(), &vectors)?; + let d2 = self.distance_type.arrow_batch_func()(centroid2.as_ref(), &vectors)?; + let mut original_assign_ops = Vec::with_capacity(row_ids.len()); + Self::assign_vectors_impl::( + self.distance_type, + part_idx, part_idx, - centroid1_part_idx, centroid2_part_idx, - row_ids, - vectors, - d0, - d1, - d2, + &row_ids, + &vectors, + d0.values(), + d1.values(), + d2.values(), &reassign_part_ids, &reassign_part_centroids, true, - &mut assign_ops, + |idx, op| original_assign_ops.push((idx, op)), )?; - // assign the vectors in the reassigned partitions - let reassign_targets = reassign_part_ids - .values() - .iter() - .copied() - .enumerate() - .collect::>(); - if !reassign_targets.is_empty() { - let builder = self; - let distance_type = self.distance_type; - let reassign_part_ids_clone = reassign_part_ids.clone(); - let reassign_part_centroids_clone = reassign_part_centroids.clone(); - stream::iter( - reassign_targets - .into_iter() - .map(move |(candidate_idx, part_id)| { - let builder = builder; - let reassign_part_ids = reassign_part_ids_clone.clone(); - let reassign_part_centroids = reassign_part_centroids_clone.clone(); - let centroid1 = c1.clone(); - let centroid2 = c2.clone(); - async move { - let part_idx = part_id as usize; - let Some((row_ids, vectors)) = - builder.load_partition_raw_vectors(part_idx).await? - else { - // all vectors in this partition have been deleted - return Ok::, Error>(Vec::new()); - }; - let ops = spawn_cpu(move || { - Self::compute_reassign_assign_ops::( - distance_type, - part_idx, - candidate_idx, - centroid1_part_idx, - centroid2_part_idx, - &row_ids, - &vectors, - centroid1, - centroid2, - &reassign_part_ids, - &reassign_part_centroids, - ) - }) - .await?; - Ok(ops) - } - }), - ) - .buffered(get_num_compute_intensive_cpus()) - .try_for_each(|ops| { - for (target_idx, op) in ops { - assign_ops[target_idx].push(op); + + Ok(Some(SplitPlan { + part_idx, + centroid2_part_idx, + centroid1, + centroid2, + reassign_part_ids, + original_assign_ops, + })) + } + + async fn collect_candidate_moves( + &self, + split_plans: &[SplitPlan], + split_partition_set: &HashSet, + ivf: &IvfModel, + ) -> Result> + where + T::Native: Dot + L2 + Normalize, + PrimitiveArray: From>, + { + let mut candidate_partitions = HashMap::>::new(); + for split_plan in split_plans { + for part_id in split_plan.reassign_part_ids.values().iter().copied() { + let part_idx = part_id as usize; + if split_partition_set.contains(&part_idx) { + continue; + } + candidate_partitions + .entry(part_idx) + .or_default() + .push(CandidateRequest { + centroid1_part_idx: split_plan.part_idx, + centroid2_part_idx: split_plan.centroid2_part_idx, + centroid1: split_plan.centroid1.clone(), + centroid2: split_plan.centroid2.clone(), + }); + } + } + + let candidate_moves = stream::iter(candidate_partitions.into_iter()) + .map(|(part_idx, requests)| async move { + let Some((row_ids, vectors)) = self.load_partition_raw_vectors(part_idx).await? + else { + return Ok::, Error>(Vec::new()); + }; + + let candidate_centroid = ivf.centroid(part_idx).ok_or(Error::invalid_input( + format!("candidate centroid not found for partition {part_idx}"), + ))?; + let baseline_dists = + self.distance_type.arrow_batch_func()(candidate_centroid.as_ref(), &vectors)?; + let mut best_moves = vec![None; row_ids.len()]; + for request in requests { + let d1 = self.distance_type.arrow_batch_func()( + request.centroid1.as_ref(), + &vectors, + )?; + let d2 = self.distance_type.arrow_batch_func()( + request.centroid2.as_ref(), + &vectors, + )?; + Self::update_best_candidate_moves::( + request.centroid1_part_idx, + request.centroid2_part_idx, + &row_ids, + &vectors, + baseline_dists.values(), + d1.values(), + d2.values(), + &mut best_moves, + part_idx, + ); } - future::ready(Ok(())) + + Ok(best_moves.into_iter().flatten().collect::>()) }) + .buffered(get_num_compute_intensive_cpus()) + .try_collect::>() .await?; - } - let new_centroids = - FixedSizeListArray::try_new_from_values(new_centroids, dimension as i32)?; - let assign_batches = self.build_assign_batch::(&new_centroids, &assign_ops)?; + Ok(candidate_moves.into_iter().flatten().collect()) + } - Ok(AssignResult { - assign_batches, - new_centroids, - }) + fn finalize_split_plans(split_plans: &mut [SplitPlan], base_num_partitions: usize) { + for (split_order, split_plan) in split_plans.iter_mut().enumerate() { + let actual_centroid2_part_idx = base_num_partitions + split_order; + if split_plan.centroid2_part_idx == actual_centroid2_part_idx { + continue; + } + let placeholder_centroid2_part_idx = split_plan.centroid2_part_idx; + for (target_idx, _) in &mut split_plan.original_assign_ops { + if *target_idx == placeholder_centroid2_part_idx { + *target_idx = actual_centroid2_part_idx; + } + } + split_plan.centroid2_part_idx = actual_centroid2_part_idx; + } } // join the given partition: @@ -1537,6 +1614,7 @@ impl IvfIndexBuilder return Ok(AssignResult { assign_batches: vec![None; ivf.num_partitions() - 1], new_centroids, + split_partitions: Vec::new(), }); }; @@ -1640,6 +1718,7 @@ impl IvfIndexBuilder Ok(AssignResult { assign_batches, new_centroids, + split_partitions: Vec::new(), }) } @@ -1808,42 +1887,6 @@ impl IvfIndexBuilder ) -> Result<(UInt32Array, FixedSizeListArray)> { select_reassign_candidates_impl(self.distance_type, ivf, part_idx, c0) } - // assign the vectors of original partition - #[allow(clippy::too_many_arguments)] - fn assign_vectors( - &self, - part_idx: usize, - centroid1_part_idx: usize, - centroid2_part_idx: usize, - row_ids: &UInt64Array, - vectors: &FixedSizeListArray, - d0: &[f32], - d1: &[f32], - d2: &[f32], - reassign_part_ids: &UInt32Array, - reassign_part_centroids: &FixedSizeListArray, - // the assign ops for each partition - // the length must be `old_num_partitions + 1` - deleted_original_partition: bool, - assign_ops: &mut [Vec], - ) -> Result<()> { - Self::assign_vectors_impl::( - self.distance_type, - part_idx, - centroid1_part_idx, - centroid2_part_idx, - row_ids, - vectors, - d0, - d1, - d2, - reassign_part_ids, - reassign_part_centroids, - deleted_original_partition, - |idx, op| assign_ops[idx].push(op), - ) - } - #[allow(clippy::too_many_arguments)] fn assign_vectors_impl( distance_type: DistanceType, @@ -1916,50 +1959,54 @@ impl IvfIndexBuilder } #[allow(clippy::too_many_arguments)] - fn compute_reassign_assign_ops( - distance_type: DistanceType, - part_idx: usize, - candidate_idx: usize, + fn update_best_candidate_moves( centroid1_part_idx: usize, centroid2_part_idx: usize, row_ids: &UInt64Array, vectors: &FixedSizeListArray, - centroid1: ArrayRef, - centroid2: ArrayRef, - reassign_part_ids: &UInt32Array, - reassign_part_centroids: &FixedSizeListArray, - ) -> Result> - where + baseline_dists: &[f32], + centroid1_dists: &[f32], + centroid2_dists: &[f32], + best_moves: &mut [Option], + part_idx: usize, + ) where T::Native: Dot + L2 + Normalize, PrimitiveArray: From>, { - let d0 = distance_type.arrow_batch_func()( - reassign_part_centroids.value(candidate_idx).as_ref(), - vectors, - )?; - let d1 = distance_type.arrow_batch_func()(centroid1.as_ref(), vectors)?; - let d2 = distance_type.arrow_batch_func()(centroid2.as_ref(), vectors)?; - let d0 = d0.values(); - let d1 = d1.values(); - let d2 = d2.values(); + for (i, &row_id) in row_ids.values().iter().enumerate() { + if baseline_dists[i] <= centroid1_dists[i] && baseline_dists[i] <= centroid2_dists[i] { + continue; + } + let (dest_part_idx, dest_distance) = if centroid1_dists[i] <= centroid2_dists[i] { + (centroid1_part_idx, centroid1_dists[i]) + } else { + (centroid2_part_idx, centroid2_dists[i]) + }; + let candidate_move = CandidateMove { + row_id, + source_part_idx: part_idx, + dest_part_idx, + dest_distance, + vector: vectors.value(i), + }; + match best_moves[i].as_mut() { + Some(best_move) if Self::is_better_candidate_move(&candidate_move, best_move) => { + *best_move = candidate_move; + } + None => { + best_moves[i] = Some(candidate_move); + } + _ => {} + } + } + } - let mut ops = Vec::new(); - Self::assign_vectors_impl::( - distance_type, - part_idx, - centroid1_part_idx, - centroid2_part_idx, - row_ids, - vectors, - d0, - d1, - d2, - reassign_part_ids, - reassign_part_centroids, - false, - |idx, op| ops.push((idx, op)), - )?; - Ok(ops) + fn is_better_candidate_move(candidate: &CandidateMove, current: &CandidateMove) -> bool { + match candidate.dest_distance.total_cmp(¤t.dest_distance) { + Ordering::Less => true, + Ordering::Equal => candidate.dest_part_idx < current.dest_part_idx, + Ordering::Greater => false, + } } // assign a vector to the closest partition among: @@ -2046,6 +2093,7 @@ struct AssignResult { // and the deleted row ids assign_batches: Vec>, new_centroids: FixedSizeListArray, + split_partitions: Vec, } #[derive(Debug, Clone)] @@ -2064,14 +2112,39 @@ enum ReassignPartition { ReassignCandidate(u32), } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] enum PartitionAdjustment { - /// Split partition at given id - Split(usize), + /// Split partitions at the given ids. + Split(Vec), /// Join partition at given id Join(usize), } +struct SplitPlan { + part_idx: usize, + centroid2_part_idx: usize, + centroid1: ArrayRef, + centroid2: ArrayRef, + reassign_part_ids: UInt32Array, + original_assign_ops: Vec<(usize, AssignOp)>, +} + +struct CandidateRequest { + centroid1_part_idx: usize, + centroid2_part_idx: usize, + centroid1: ArrayRef, + centroid2: ArrayRef, +} + +#[derive(Clone)] +struct CandidateMove { + row_id: u64, + source_part_idx: usize, + dest_part_idx: usize, + dest_distance: f32, + vector: ArrayRef, +} + pub(crate) fn index_type_string(sub_index: SubIndexType, quantizer: QuantizationType) -> String { // FlatBin is a QuantizationType variant used internally for reconstruction, // but the persisted index type string uses "FLAT" (differentiated by DataType). @@ -2166,49 +2239,115 @@ mod tests { } #[test] - fn compute_reassign_assign_ops_moves_vectors_to_new_centroids() { + fn compute_reassign_candidate_moves_vectors_to_new_centroids() { let row_ids = UInt64Array::from(vec![1_u64, 2_u64]); let vectors = FixedSizeListArray::try_new_from_values( Float32Array::from(vec![0.0_f32, 0.0, 10.0, 10.0]), 2, ) .unwrap(); - let reassign_part_ids = UInt32Array::from(vec![0_u32]); let reassign_part_centroids = FixedSizeListArray::try_new_from_values(Float32Array::from(vec![9.0_f32, 9.0]), 2) .unwrap(); - let centroid1: ArrayRef = Arc::new(Float32Array::from(vec![0.0_f32, 0.0])); - let centroid2: ArrayRef = Arc::new(Float32Array::from(vec![20.0_f32, 20.0])); + let baseline_dists = DistanceType::L2.arrow_batch_func()( + reassign_part_centroids.value(0).as_ref(), + &vectors, + ) + .unwrap(); + let centroid1_dists = + DistanceType::L2.arrow_batch_func()(&Float32Array::from(vec![0.0_f32, 0.0]), &vectors) + .unwrap(); + let centroid2_dists = DistanceType::L2.arrow_batch_func()( + &Float32Array::from(vec![20.0_f32, 20.0]), + &vectors, + ) + .unwrap(); - let ops = IvfIndexBuilder::::compute_reassign_assign_ops::< - Float32Type, - >( - DistanceType::L2, - 0, - 0, + let mut best_moves = vec![None; row_ids.len()]; + IvfIndexBuilder::::update_best_candidate_moves::( 1, 2, &row_ids, &vectors, - centroid1, - centroid2, - &reassign_part_ids, - &reassign_part_centroids, + baseline_dists.values(), + centroid1_dists.values(), + centroid2_dists.values(), + &mut best_moves, + 0, + ); + + assert_eq!(best_moves.iter().flatten().count(), 1); + let candidate_move = best_moves[0].as_ref().unwrap(); + assert_eq!(candidate_move.row_id, 1); + assert_eq!(candidate_move.source_part_idx, 0); + assert_eq!(candidate_move.dest_part_idx, 1); + assert_eq!( + candidate_move.vector.as_primitive::().values(), + &[0.0_f32, 0.0] + ); + } + + #[test] + fn update_best_candidate_moves_preserves_multivector_entries() { + let row_ids = UInt64Array::from(vec![7_u64, 7_u64]); + let vectors = FixedSizeListArray::try_new_from_values( + Float32Array::from(vec![0.0_f32, 0.0, 1.0, 1.0]), + 2, ) .unwrap(); + let baseline_dists = Float32Array::from(vec![10.0_f32, 10.0]); + let centroid1_dists = Float32Array::from(vec![1.0_f32, 2.0]); + let centroid2_dists = Float32Array::from(vec![3.0_f32, 4.0]); - assert_eq!(ops.len(), 2); - assert!(matches!(ops[0], (0, AssignOp::Remove(1)))); - match &ops[1] { - (1, AssignOp::Add((row_id, vector))) => { - assert_eq!(*row_id, 1); - assert_eq!( - vector.as_primitive::().values(), - &[0.0_f32, 0.0] - ); - } - other => panic!("unexpected op: {:?}", other), - } + let mut best_moves = vec![None; row_ids.len()]; + IvfIndexBuilder::::update_best_candidate_moves::( + 1, + 2, + &row_ids, + &vectors, + baseline_dists.values(), + centroid1_dists.values(), + centroid2_dists.values(), + &mut best_moves, + 0, + ); + + let best_moves = best_moves.into_iter().flatten().collect::>(); + assert_eq!(best_moves.len(), 2); + assert_eq!(best_moves[0].row_id, 7); + assert_eq!(best_moves[1].row_id, 7); + assert_eq!( + best_moves[0].vector.as_primitive::().values(), + &[0.0_f32, 0.0] + ); + assert_eq!( + best_moves[1].vector.as_primitive::().values(), + &[1.0_f32, 1.0] + ); + } + + #[test] + fn finalize_split_plans_reassigns_filtered_centroid_ids() { + let centroid1: ArrayRef = Arc::new(Float32Array::from(vec![0.0_f32, 0.0])); + let centroid2: ArrayRef = Arc::new(Float32Array::from(vec![1.0_f32, 1.0])); + let vector: ArrayRef = Arc::new(Float32Array::from(vec![2.0_f32, 2.0])); + let mut split_plans = vec![SplitPlan { + part_idx: 3, + centroid2_part_idx: 5, + centroid1, + centroid2, + reassign_part_ids: UInt32Array::from(vec![0_u32]), + original_assign_ops: vec![ + (3, AssignOp::Add((10, vector.clone()))), + (5, AssignOp::Add((11, vector))), + ], + }]; + + IvfIndexBuilder::::finalize_split_plans(&mut split_plans, 4); + + assert_eq!(split_plans[0].centroid2_part_idx, 4); + assert_eq!(split_plans[0].original_assign_ops[0].0, 3); + assert_eq!(split_plans[0].original_assign_ops[1].0, 4); } #[tokio::test] diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 595e5fb530f..4ccb65f086a 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1652,10 +1652,9 @@ mod tests { let dataset = Dataset::open(test_uri).await.unwrap(); let final_ctx = load_vector_index_context(&dataset, "vector", INDEX_NAME).await; - assert_eq!( - final_ctx.num_partitions(), - 3, - "Expected partition split to increase partitions from 2 to 3 for {}, got stats: {}", + assert!( + final_ctx.num_partitions() >= 3, + "Expected partition split to increase partitions beyond 2 for {}, got stats: {}", description, final_ctx.stats_json() ); @@ -1732,6 +1731,51 @@ mod tests { append_constant_vector_with_params(dataset, rows, template, None).await; } + async fn append_partition_templates( + dataset: &mut Dataset, + rows_per_template: usize, + templates: &[Vec], + ) { + assert!( + !templates.is_empty(), + "at least one template is required for append" + ); + for template in templates { + assert_eq!( + template.len(), + DIM, + "Template vector should have {} dimensions", + DIM + ); + } + + let start_id = dataset.count_all_rows().await.unwrap() as u64; + let total_rows = rows_per_template * templates.len(); + let ids = Arc::new(UInt64Array::from_iter_values( + start_id..start_id + total_rows as u64, + )); + let mut appended_values = Vec::with_capacity(total_rows * DIM); + for template in templates { + for _ in 0..rows_per_template { + appended_values.extend_from_slice(template); + } + } + let vectors = Arc::new( + FixedSizeListArray::try_new_from_values( + Float32Array::from(appended_values), + DIM as i32, + ) + .unwrap(), + ); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt64, false), + Field::new("vector", vectors.data_type().clone(), false), + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![ids, vectors]).unwrap(); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema); + dataset.append(batches, None).await.unwrap(); + } + async fn append_constant_vector_with_params( dataset: &mut Dataset, rows: usize, @@ -4598,6 +4642,119 @@ mod tests { verify_partition_split_after_append(dataset, test_uri, params, "multivector data").await; } + #[tokio::test] + async fn test_split_multiple_partitions_in_one_optimize() { + const INDEX_NAME: &str = "vector_idx"; + const BASE_ROWS_PER_PARTITION: usize = 512; + const APPEND_ROWS_PER_PARTITION: usize = 40_000; + let offsets = [-50.0, 50.0]; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let (batch, schema) = generate_clustered_batch(BASE_ROWS_PER_PARTITION, offsets); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + batches, + test_uri, + Some(WriteParams { + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let centroids = build_centroids_for_offsets(&offsets); + let ivf_params = IvfBuildParams::try_with_centroids(2, centroids).unwrap(); + let params = VectorIndexParams::with_ivf_pq_params( + DistanceType::L2, + ivf_params, + PQBuildParams::default(), + ); + dataset + .create_index( + &["vector"], + IndexType::Vector, + Some(INDEX_NAME.to_string()), + ¶ms, + true, + ) + .await + .unwrap(); + + let initial_ctx = load_vector_index_context(&dataset, "vector", INDEX_NAME).await; + assert_eq!(initial_ctx.num_partitions(), 2); + let mut templates = Vec::with_capacity(2); + for partition_idx in 0..2 { + let row_ids = load_partition_row_ids(initial_ctx.ivf(), partition_idx).await; + let template_batch = dataset + .take_rows(&[row_ids[0]], dataset.schema().clone()) + .await + .unwrap(); + templates.push( + template_batch["vector"] + .as_fixed_size_list() + .value(0) + .as_primitive::() + .values() + .to_vec(), + ); + } + + append_partition_templates(&mut dataset, APPEND_ROWS_PER_PARTITION, &templates).await; + + dataset + .optimize_indices(&OptimizeOptions::new()) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + let final_ctx = load_vector_index_context(&dataset, "vector", INDEX_NAME).await; + assert_eq!( + final_ctx.num_partitions(), + 4, + "Expected both original partitions to split in one optimize, stats: {}", + final_ctx.stats_json() + ); + + let indices = final_ctx.stats()["indices"] + .as_array() + .expect("indices should be present"); + assert_eq!( + indices.len(), + 1, + "Expected split optimize to merge into one index, stats: {}", + final_ctx.stats_json() + ); + + let partitions = indices[0]["partitions"] + .as_array() + .expect("partitions should be present"); + assert_eq!(partitions.len(), 4); + let expected_rows = 2 * BASE_ROWS_PER_PARTITION + 2 * APPEND_ROWS_PER_PARTITION; + let total_partition_rows = partitions + .iter() + .map(|part| part["size"].as_u64().unwrap() as usize) + .sum::(); + assert_eq!(total_partition_rows, expected_rows); + assert_eq!(dataset.count_all_rows().await.unwrap(), expected_rows); + + let nearest = dataset + .scan() + .with_row_id() + .nearest("vector", &Float32Array::from(templates[0].clone()), 10) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let ids = nearest[ROW_ID].as_primitive::(); + let mut seen = HashSet::new(); + for row_id in ids.values() { + assert!(seen.insert(*row_id), "Duplicate row id found: {}", row_id); + } + } + #[tokio::test] async fn test_join_partition_on_delete_multivec() { // This test verifies that IVF index with multivector data handles deletions