diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 2c496a77a74..21f6999fff1 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -537,9 +537,60 @@ impl<'a> TransactionRebase<'a> { Ok(()) } } - // Although some of the rows we indexed may have been deleted / moved, - // row ids are still valid, so we allow this optimistically. - Operation::Delete { .. } | Operation::Update { .. } => Ok(()), + // Delete doesn't change row ids in remaining fragments, so the + // index entries for those fragments are still valid. + Operation::Delete { .. } => Ok(()), + // For Update with RewriteRows mode, the old fragment is removed + // and a new one created. effective_fragment_bitmap filters out the + // deleted fragment at query time, so stale index entries are harmless. + // + // For Update with RewriteColumns mode, column data is rewritten + // in-place (same fragment ID, same row IDs, different values). + // If the rewritten columns overlap with indexed fields AND the + // rewritten fragments are in the new index's bitmap, the index + // data is stale and we must retry. + Operation::Update { + updated_fragments, + fields_modified, + update_mode, + .. + } => { + use crate::dataset::transaction::UpdateMode::RewriteColumns; + let is_rewrite_columns = update_mode + .as_ref() + .is_some_and(|m| *m == RewriteColumns); + + if is_rewrite_columns && !fields_modified.is_empty() { + let modified_set: std::collections::HashSet<&u32> = + fields_modified.iter().collect(); + let index_covers_modified = new_indices.iter().any(|idx| { + idx.fields.iter().any(|f| { + modified_set.contains(&u32::try_from(*f).unwrap()) + }) + }); + if index_covers_modified { + let rewritten_ids: std::collections::HashSet = + updated_fragments.iter().map(|f| f.id as u32).collect(); + let bitmap_overlap = new_indices.iter().any(|idx| { + match &idx.fragment_bitmap { + // None = unknown coverage; conservatively + // assume it overlaps. + None => true, + Some(bm) => { + bm.iter().any(|id| rewritten_ids.contains(&id)) + } + } + }); + if bitmap_overlap { + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + )); + } + } + } + Ok(()) + } // Merge, reserve, and project don't change row ids, so this should be fine. Operation::Merge { .. } => Ok(()), Operation::ReserveFragments { .. } => Ok(()), @@ -3606,4 +3657,171 @@ mod tests { assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5); } + + #[tokio::test] + async fn test_create_index_vs_rewrite_columns_update() { + use crate::dataset::transaction::UpdateMode; + use io::commit::conflict_resolver::tests::{ConflictResult::*, modified_fragment_ids}; + use lance_table::format::IndexMetadata; + use roaring::RoaringBitmap; + use uuid::Uuid; + + let fragment0 = Fragment::new(0); + let fragment1 = Fragment::new(1); + + let index_on_field_0 = IndexMetadata { + uuid: Uuid::new_v4(), + name: "test_index".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: Some(RoaringBitmap::from_iter([0u32])), + index_details: None, + index_version: 0, + created_at: None, + base_id: None, + files: None, + }; + + let index_no_bitmap = IndexMetadata { + uuid: Uuid::new_v4(), + name: "test_index_no_bm".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: None, + index_details: None, + index_version: 0, + created_at: None, + base_id: None, + files: None, + }; + + let cases = vec![ + ( + "RewriteColumns, overlapping fields, overlapping fragments -> Retryable", + Operation::CreateIndex { + new_indices: vec![index_on_field_0.clone()], + removed_indices: vec![], + }, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![fragment0.clone()], + new_fragments: vec![], + fields_modified: vec![0], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, + }, + Retryable, + ), + ( + "RewriteColumns, non-overlapping fields -> Compatible", + Operation::CreateIndex { + new_indices: vec![index_on_field_0.clone()], + removed_indices: vec![], + }, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![fragment0.clone()], + new_fragments: vec![], + fields_modified: vec![1], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, + }, + Compatible, + ), + ( + "RewriteColumns, overlapping fields, non-overlapping fragments -> Compatible", + Operation::CreateIndex { + new_indices: vec![index_on_field_0.clone()], + removed_indices: vec![], + }, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![fragment1.clone()], + new_fragments: vec![], + fields_modified: vec![0], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, + }, + Compatible, + ), + ( + "RewriteColumns, overlapping fields, bitmap=None -> Retryable", + Operation::CreateIndex { + new_indices: vec![index_no_bitmap], + removed_indices: vec![], + }, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![fragment0.clone()], + new_fragments: vec![], + fields_modified: vec![0], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, + }, + Retryable, + ), + ( + "RewriteRows, overlapping fields and fragments -> Compatible", + Operation::CreateIndex { + new_indices: vec![index_on_field_0], + removed_indices: vec![], + }, + Operation::Update { + removed_fragment_ids: vec![0], + updated_fragments: vec![], + new_fragments: vec![fragment1], + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: Some(UpdateMode::RewriteRows), + inserted_rows_filter: None, + }, + Compatible, + ), + ]; + + for (description, op1, op2, expected) in cases { + let txn1 = Transaction::new(0, op1.clone(), None); + let txn2 = Transaction::new(0, op2.clone(), None); + + let mut rebase = TransactionRebase { + transaction: txn1, + initial_fragments: HashMap::new(), + modified_fragment_ids: modified_fragment_ids(&op1).collect::>(), + affected_rows: None, + conflicting_frag_reuse_indices: Vec::new(), + conflicting_mem_wal_merged_gens: Vec::new(), + }; + + let result = rebase.check_txn(&txn2, 1); + match expected { + Compatible => { + assert!( + result.is_ok(), + "{description}: expected Compatible but got {result:?}", + ); + } + NotCompatible => { + assert!( + matches!(result, Err(Error::CommitConflict { .. })), + "{description}: expected NotCompatible but got {result:?}", + ); + } + Retryable => { + assert!( + matches!(result, Err(Error::RetryableCommitConflict { .. })), + "{description}: expected Retryable but got {result:?}", + ); + } + } + } + } }