From 3b75e7ec397add2b1a27ec451298d9db5fece403 Mon Sep 17 00:00:00 2001 From: Jing chen He Date: Tue, 28 Apr 2026 22:00:10 -0700 Subject: [PATCH 1/2] fix: Operation::Merge: refresh per-row version metadata for rewritten fragments --- rust/lance/src/dataset/transaction.rs | 249 +++++++++++++++++++++++++- 1 file changed, 248 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f7a516c4f3..a3c122669c 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -2011,7 +2011,36 @@ impl Transaction { final_fragments.extend(maybe_existing_fragments?.clone()); } Operation::Merge { fragments, .. } => { - final_fragments.extend(fragments.clone()); + let mut merged_fragments = fragments.clone(); + if next_row_id.is_some() { + let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); + let prev_by_id: HashMap = maybe_existing_fragments? + .iter() + .map(|f| (f.id, f)) + .collect(); + for fragment in merged_fragments.iter_mut() { + match prev_by_id.get(&fragment.id) { + Some(prev) => { + if merge_fragment_physically_rewritten(prev, fragment) { + lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols( + fragment, + new_version, + )?; + } + } + None => { + // Fragment id not present in the previous manifest (Merge may append + // ids beyond the original list). Apply the same full-fragment + // last_updated refresh as for a rewrite. + lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols( + fragment, + new_version, + )?; + } + } + } + } + final_fragments.extend(merged_fragments); // Some fields that have indices may have been removed, so we should // remove those indices as well. @@ -3488,6 +3517,19 @@ fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Res Ok(()) } +/// Returns true if Operation::Merge rewrote this fragment's column data files (Fragment::files +/// changed versus the previous manifest). Used to bump last_updated_at_version_meta only when +/// new column values were materialized to disk. +/// +/// Deletion file changes alone are not treated as rewrites: tombstones remove rows but +/// survivors did not receive new column bytes; stamping last_updated for those rows would be +/// incorrect for CDF. +#[inline] +fn merge_fragment_physically_rewritten(prev: &Fragment, merged: &Fragment) -> bool { + debug_assert_eq!(prev.id, merged.id); + prev.files != merged.files +} + /// Validate that Merge operations preserve all original fragments. /// Merge operations should only add columns or rows, not reduce fragments. /// This ensures fragments correspond at one-to-one with the original fragment list. @@ -4649,6 +4691,211 @@ mod tests { seq.versions().collect() } + #[test] + fn merge_build_manifest_refreshes_last_updated_when_data_files_change_stable_row_ids() { + use lance_file::version::LanceFileVersion; + use lance_table::feature_flags::FLAG_STABLE_ROW_IDS; + + let (major, minor) = LanceFileVersion::Stable.to_numbers(); + let mk_file = |path: &str| DataFile::new(path, vec![0], vec![0], major, minor, None, None); + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap(); + + let row_ids = RowIdSequence::from([100u64, 101, 102, 103, 104].as_slice()); + let row_id_meta = Some(RowIdMeta::Inline(write_row_ids(&row_ids))); + + let prev_fragment = Fragment { + id: 0, + files: vec![mk_file("before.lance")], + deletion_file: None, + row_id_meta: row_id_meta.clone(), + physical_rows: Some(5), + last_updated_at_version_meta: None, + created_at_version_meta: None, + }; + + let mut manifest = Manifest::new( + lance_schema.clone(), + Arc::new(vec![prev_fragment.clone()]), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS; + manifest.next_row_id = 100; + + let merged_fragment = Fragment { + files: vec![mk_file("after.lance")], + ..prev_fragment + }; + + let tx = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![merged_fragment], + schema: lance_schema, + }, + None, + ); + + let (out, _) = tx + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert_eq!(out.version, 2); + let frag = &out.fragments[0]; + let seq = frag + .last_updated_at_version_meta + .as_ref() + .unwrap() + .load_sequence() + .unwrap(); + assert_eq!(seq.version_at(0).unwrap(), 2); + assert_eq!(seq.version_at(4).unwrap(), 2); + } + + #[test] + fn merge_build_manifest_skips_refresh_when_carry_forward_stable_row_ids() { + use lance_file::version::LanceFileVersion; + use lance_table::feature_flags::FLAG_STABLE_ROW_IDS; + use lance_table::rowids::version::{RowDatasetVersionMeta, RowDatasetVersionSequence}; + + let (major, minor) = LanceFileVersion::Stable.to_numbers(); + let data_file = DataFile::new("same.lance", vec![0], vec![0], major, minor, None, None); + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap(); + + let row_ids = RowIdSequence::from([200u64, 201, 202, 203, 204].as_slice()); + let row_id_meta = Some(RowIdMeta::Inline(write_row_ids(&row_ids))); + + let uniform_v1 = RowDatasetVersionSequence::from_uniform_row_count(5, 1); + let meta_v1 = RowDatasetVersionMeta::from_sequence(&uniform_v1).unwrap(); + + let prev_fragment = Fragment { + id: 0, + files: vec![data_file.clone()], + deletion_file: None, + row_id_meta: row_id_meta.clone(), + physical_rows: Some(5), + last_updated_at_version_meta: Some(meta_v1.clone()), + created_at_version_meta: None, + }; + + let mut manifest = Manifest::new( + lance_schema.clone(), + Arc::new(vec![prev_fragment]), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS; + manifest.next_row_id = 100; + + let merged_fragment = Fragment { + id: 0, + files: vec![data_file], + deletion_file: None, + row_id_meta, + physical_rows: Some(5), + last_updated_at_version_meta: Some(meta_v1.clone()), + created_at_version_meta: None, + }; + + let tx = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![merged_fragment], + schema: lance_schema, + }, + None, + ); + + let (out, _) = tx + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + let seq = out.fragments[0] + .last_updated_at_version_meta + .as_ref() + .unwrap() + .load_sequence() + .unwrap(); + assert_eq!(seq.version_at(0).unwrap(), 1); + assert_eq!(seq.version_at(4).unwrap(), 1); + } + + #[test] + fn merge_build_manifest_no_last_updated_refresh_without_stable_row_ids() { + use lance_file::version::LanceFileVersion; + use lance_table::feature_flags::FLAG_STABLE_ROW_IDS; + + let (major, minor) = LanceFileVersion::Stable.to_numbers(); + let mk_file = |path: &str| DataFile::new(path, vec![0], vec![0], major, minor, None, None); + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap(); + + let prev_fragment = Fragment { + id: 0, + files: vec![mk_file("before.lance")], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(5), + last_updated_at_version_meta: None, + created_at_version_meta: None, + }; + + let manifest = Manifest::new( + lance_schema.clone(), + Arc::new(vec![prev_fragment.clone()]), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + assert_eq!( + manifest.reader_feature_flags & FLAG_STABLE_ROW_IDS, + 0, + "manifest must not use stable row IDs for this guard test" + ); + + let merged_fragment = Fragment { + files: vec![mk_file("after.lance")], + ..prev_fragment + }; + + let tx = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![merged_fragment], + schema: lance_schema, + }, + None, + ); + + let (out, _) = tx + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert!( + out.fragments[0].last_updated_at_version_meta.is_none(), + "without stable row IDs, Merge must not populate per-row last_updated metadata" + ); + } + #[test] fn test_update_version_tracking_preserves_created_at() { let existing_seq = RowIdSequence::from([100u64, 101, 102].as_slice()); From c1f662f4d276d11a56084e01bc16201a21287161 Mon Sep 17 00:00:00 2001 From: Jing chen He Date: Fri, 1 May 2026 17:37:04 -0700 Subject: [PATCH 2/2] Address CI issues --- rust/lance/src/dataset/transaction.rs | 115 ++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index a3c122669c..36605a83d3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -2029,13 +2029,15 @@ impl Transaction { } } None => { - // Fragment id not present in the previous manifest (Merge may append - // ids beyond the original list). Apply the same full-fragment - // last_updated refresh as for a rewrite. + // Brand-new fragment ID not present in the previous manifest. + // Set both last_updated and created version meta, consistent + // with Append/Overwrite for genuinely new fragments. lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols( fragment, new_version, )?; + fragment.created_at_version_meta = + fragment.last_updated_at_version_meta.clone(); } } } @@ -3527,7 +3529,20 @@ fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Res #[inline] fn merge_fragment_physically_rewritten(prev: &Fragment, merged: &Fragment) -> bool { debug_assert_eq!(prev.id, merged.id); - prev.files != merged.files + if prev.files.len() != merged.files.len() { + return true; + } + // Compare identity fields only. file_size_bytes is an AtomicU64 cache that + // concurrent scans can populate in place on the manifest's DataFile, so it + // must not be part of the rewrite check. + prev.files.iter().zip(merged.files.iter()).any(|(p, m)| { + p.path != m.path + || p.fields != m.fields + || p.column_indices != m.column_indices + || p.file_major_version != m.file_major_version + || p.file_minor_version != m.file_minor_version + || p.base_id != m.base_id + }) } /// Validate that Merge operations preserve all original fragments. @@ -4709,7 +4724,7 @@ mod tests { id: 0, files: vec![mk_file("before.lance")], deletion_file: None, - row_id_meta: row_id_meta.clone(), + row_id_meta, physical_rows: Some(5), last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4802,7 +4817,7 @@ mod tests { deletion_file: None, row_id_meta, physical_rows: Some(5), - last_updated_at_version_meta: Some(meta_v1.clone()), + last_updated_at_version_meta: Some(meta_v1), created_at_version_meta: None, }; @@ -4896,6 +4911,94 @@ mod tests { ); } + #[test] + fn merge_build_manifest_sets_both_version_meta_for_new_fragment_id_stable_row_ids() { + use lance_file::version::LanceFileVersion; + use lance_table::feature_flags::FLAG_STABLE_ROW_IDS; + + let (major, minor) = LanceFileVersion::Stable.to_numbers(); + let mk_file = |path: &str| DataFile::new(path, vec![0], vec![0], major, minor, None, None); + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap(); + + // Existing fragment (id=0) with stable row IDs + let row_ids_0 = RowIdSequence::from([10u64, 11, 12].as_slice()); + let existing_fragment = Fragment { + id: 0, + files: vec![mk_file("existing.lance")], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&row_ids_0))), + physical_rows: Some(3), + last_updated_at_version_meta: None, + created_at_version_meta: None, + }; + + let mut manifest = Manifest::new( + lance_schema.clone(), + Arc::new(vec![existing_fragment.clone()]), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS; + manifest.next_row_id = 100; + manifest.version = 1; + + // New fragment (id=1) not present in prev manifest — exercises the None branch + let row_ids_1 = RowIdSequence::from([20u64, 21, 22, 23].as_slice()); + let new_fragment = Fragment { + id: 1, + files: vec![mk_file("new.lance")], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&row_ids_1))), + physical_rows: Some(4), + last_updated_at_version_meta: None, + created_at_version_meta: None, + }; + + let tx = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![existing_fragment, new_fragment], + schema: lance_schema, + }, + None, + ); + + let (out, _) = tx + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert_eq!(out.version, 2); + + let new_frag = out.fragments.iter().find(|f| f.id == 1).unwrap(); + + // last_updated_at_version must be set to the commit version + let last_updated_seq = new_frag + .last_updated_at_version_meta + .as_ref() + .expect("new fragment must have last_updated_at_version_meta") + .load_sequence() + .unwrap(); + assert_eq!(last_updated_seq.version_at(0).unwrap(), 2); + assert_eq!(last_updated_seq.version_at(3).unwrap(), 2); + + // created_at_version must also be set — must not be None + let created_seq = new_frag + .created_at_version_meta + .as_ref() + .expect("new fragment must have created_at_version_meta") + .load_sequence() + .unwrap(); + assert_eq!(created_seq.version_at(0).unwrap(), 2); + assert_eq!(created_seq.version_at(3).unwrap(), 2); + } + #[test] fn test_update_version_tracking_preserves_created_at() { let existing_seq = RowIdSequence::from([100u64, 101, 102].as_slice());