Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 221 additions & 3 deletions rust/lance/src/io/commit/conflict_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,60 @@ impl<'a> TransactionRebase<'a> {
Ok(())
}
}
// Although some of the rows we indexed may have been deleted / moved,
// row ids are still valid, so we allow this optimistically.
Operation::Delete { .. } | Operation::Update { .. } => Ok(()),
// Delete doesn't change row ids in remaining fragments, so the
// index entries for those fragments are still valid.
Operation::Delete { .. } => Ok(()),
// For Update with RewriteRows mode, the old fragment is removed
// and a new one created. effective_fragment_bitmap filters out the
// deleted fragment at query time, so stale index entries are harmless.
//
// For Update with RewriteColumns mode, column data is rewritten
// in-place (same fragment ID, same row IDs, different values).
// If the rewritten columns overlap with indexed fields AND the
// rewritten fragments are in the new index's bitmap, the index
// data is stale and we must retry.
Operation::Update {
updated_fragments,
fields_modified,
update_mode,
..
} => {
use crate::dataset::transaction::UpdateMode::RewriteColumns;
let is_rewrite_columns = update_mode
.as_ref()
.is_some_and(|m| *m == RewriteColumns);

if is_rewrite_columns && !fields_modified.is_empty() {
let modified_set: std::collections::HashSet<&u32> =
fields_modified.iter().collect();
let index_covers_modified = new_indices.iter().any(|idx| {
idx.fields.iter().any(|f| {
modified_set.contains(&u32::try_from(*f).unwrap())
})
});
if index_covers_modified {
let rewritten_ids: std::collections::HashSet<u32> =
updated_fragments.iter().map(|f| f.id as u32).collect();
let bitmap_overlap = new_indices.iter().any(|idx| {
match &idx.fragment_bitmap {
// None = unknown coverage; conservatively
// assume it overlaps.
None => true,
Some(bm) => {
bm.iter().any(|id| rewritten_ids.contains(&id))
}
}
});
if bitmap_overlap {
return Err(self.retryable_conflict_err(
other_transaction,
other_version,
));
}
}
}
Ok(())
}
// Merge, reserve, and project don't change row ids, so this should be fine.
Operation::Merge { .. } => Ok(()),
Operation::ReserveFragments { .. } => Ok(()),
Expand Down Expand Up @@ -3606,4 +3657,171 @@ mod tests {

assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5);
}

#[tokio::test]
async fn test_create_index_vs_rewrite_columns_update() {
use crate::dataset::transaction::UpdateMode;
use io::commit::conflict_resolver::tests::{ConflictResult::*, modified_fragment_ids};
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use uuid::Uuid;

let fragment0 = Fragment::new(0);
let fragment1 = Fragment::new(1);

let index_on_field_0 = IndexMetadata {
uuid: Uuid::new_v4(),
name: "test_index".to_string(),
fields: vec![0],
dataset_version: 1,
fragment_bitmap: Some(RoaringBitmap::from_iter([0u32])),
index_details: None,
index_version: 0,
created_at: None,
base_id: None,
files: None,
};

let index_no_bitmap = IndexMetadata {
uuid: Uuid::new_v4(),
name: "test_index_no_bm".to_string(),
fields: vec![0],
dataset_version: 1,
fragment_bitmap: None,
index_details: None,
index_version: 0,
created_at: None,
base_id: None,
files: None,
};

let cases = vec![
(
"RewriteColumns, overlapping fields, overlapping fragments -> Retryable",
Operation::CreateIndex {
new_indices: vec![index_on_field_0.clone()],
removed_indices: vec![],
},
Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![fragment0.clone()],
new_fragments: vec![],
fields_modified: vec![0],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
},
Retryable,
),
(
"RewriteColumns, non-overlapping fields -> Compatible",
Operation::CreateIndex {
new_indices: vec![index_on_field_0.clone()],
removed_indices: vec![],
},
Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![fragment0.clone()],
new_fragments: vec![],
fields_modified: vec![1],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
},
Compatible,
),
(
"RewriteColumns, overlapping fields, non-overlapping fragments -> Compatible",
Operation::CreateIndex {
new_indices: vec![index_on_field_0.clone()],
removed_indices: vec![],
},
Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![fragment1.clone()],
new_fragments: vec![],
fields_modified: vec![0],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
},
Compatible,
),
(
"RewriteColumns, overlapping fields, bitmap=None -> Retryable",
Operation::CreateIndex {
new_indices: vec![index_no_bitmap],
removed_indices: vec![],
},
Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![fragment0.clone()],
new_fragments: vec![],
fields_modified: vec![0],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
},
Retryable,
),
(
"RewriteRows, overlapping fields and fragments -> Compatible",
Operation::CreateIndex {
new_indices: vec![index_on_field_0],
removed_indices: vec![],
},
Operation::Update {
removed_fragment_ids: vec![0],
updated_fragments: vec![],
new_fragments: vec![fragment1],
fields_modified: vec![],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteRows),
inserted_rows_filter: None,
},
Compatible,
),
];

for (description, op1, op2, expected) in cases {
let txn1 = Transaction::new(0, op1.clone(), None);
let txn2 = Transaction::new(0, op2.clone(), None);

let mut rebase = TransactionRebase {
transaction: txn1,
initial_fragments: HashMap::new(),
modified_fragment_ids: modified_fragment_ids(&op1).collect::<HashSet<_>>(),
affected_rows: None,
conflicting_frag_reuse_indices: Vec::new(),
conflicting_mem_wal_merged_gens: Vec::new(),
};

let result = rebase.check_txn(&txn2, 1);
match expected {
Compatible => {
assert!(
result.is_ok(),
"{description}: expected Compatible but got {result:?}",
);
}
NotCompatible => {
assert!(
matches!(result, Err(Error::CommitConflict { .. })),
"{description}: expected NotCompatible but got {result:?}",
);
}
Retryable => {
assert!(
matches!(result, Err(Error::RetryableCommitConflict { .. })),
"{description}: expected Retryable but got {result:?}",
);
}
}
}
}
}
Loading