Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 351 additions & 1 deletion rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,38 @@ impl Transaction {
final_fragments.extend(maybe_existing_fragments?.clone());
}
Operation::Merge { fragments, .. } => {
final_fragments.extend(fragments.clone());
let mut merged_fragments = fragments.clone();
if next_row_id.is_some() {
let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
let prev_by_id: HashMap<u64, &Fragment> = maybe_existing_fragments?
.iter()
.map(|f| (f.id, f))
.collect();
for fragment in merged_fragments.iter_mut() {
match prev_by_id.get(&fragment.id) {
Some(prev) => {
if merge_fragment_physically_rewritten(prev, fragment) {
lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
fragment,
new_version,
)?;
}
}
None => {
// Brand-new fragment ID not present in the previous manifest.
// Set both last_updated and created version meta, consistent
// with Append/Overwrite for genuinely new fragments.
lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
fragment,
new_version,
)?;
fragment.created_at_version_meta =
fragment.last_updated_at_version_meta.clone();
}
Comment on lines +2031 to +2041
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In the new Operation::Merge None branch (lines 2031-2039), brand-new fragment IDs only have last_updated_at_version_meta initialized; created_at_version_meta is left at whatever the caller passed (typically None). This is inconsistent with Operation::Append (1810-1814) and Operation::Overwrite (1949-1953), which set both fields via build_version_meta for genuinely new fragments. No in-tree caller currently appends new fragment IDs through Merge, but the Python transaction API exposes Operation::Merge directly (python/src/transaction.rs:313), so external callers reaching this branch will see _row_created_at_version come back NULL while _row_last_updated_at_version is populated. Suggest mirroring the Append pattern in the None arm: build the version meta once and assign to both fields.

Extended reasoning...

What the bug is

The PR adds two arms inside the new Operation::Merge block in build_manifest:

  • Some(prev) branch (existing fragment id) — refresh last_updated only if Fragment::files changed. This is correct: the rows already existed in a prior version, so created_at must be preserved as-is.
  • None branch (fragment id not in previous manifest, i.e. brand-new) — unconditionally calls refresh_row_latest_update_meta_for_full_frag_rewrite_cols.

refresh_row_latest_update_meta_for_full_frag_rewrite_cols (rust/lance-table/src/rowids/version.rs:493-520) only writes fragment.last_updated_at_version_meta = Some(version_meta); at line 516. It never touches created_at_version_meta. So a brand-new fragment introduced via Merge will end up with last_updated stamped to new_version and created_at left as whatever the caller supplied — typically None.

Why this is inconsistent with the rest of the file

The established pattern for newly-introduced fragments under stable row IDs is to populate both fields:

// Operation::Append (lines 1810-1814)
let version_meta = build_version_meta(fragment, new_version);
fragment.last_updated_at_version_meta = version_meta.clone();
fragment.created_at_version_meta = version_meta;

Operation::Overwrite does the same at 1949-1953, and Update's resolve_update_version_metadata does the same for genuinely new rows. The None branch is the only place a brand-new fragment gets last_updated set without created_at.

Step-by-step proof

  1. External caller (e.g. via the Python API at python/src/transaction.rs:313) constructs an Operation::Merge with a fragment whose id is greater than any id in the previous manifest, with row_id_meta = Some(...), physical_rows = Some(N), and both *_version_meta fields left as None (the natural default for a freshly-built Fragment).
  2. merge_fragments_valid (transaction.rs:3536) accepts this — test_merge_fragments_valid Test 4 explicitly asserts that adding new fragment IDs is valid.
  3. build_manifest enters the Merge arm, prev_by_id.get(&fragment.id) returns None, the None arm runs, and refresh_row_latest_update_meta_for_full_frag_rewrite_cols populates only last_updated_at_version_meta to new_version.
  4. created_at_version_meta stays None.
  5. A subsequent CDF query against the new rows returns new_version for _row_last_updated_at_version but NULL for _row_created_at_version — for rows that demonstrably did not exist before new_version.

Why I disagree with the refutation

The refutation argues that Merge is caller-controlled and the asymmetry mirrors the Some(prev) arm (which also doesn't touch created_at). But the Some(prev) arm is preserving created_at from a row that genuinely was created in an earlier version — that is correct. The None arm by contrast is overwriting the caller's last_updated (treating it the same way Append does) while leaving created_at un-overwritten. If the contract is "caller fully owns version meta," the None branch should not be touching last_updated either; if the contract is "merge stamps brand-new fragments like Append does," it should also stamp created_at. The current half-and-half behavior is the inconsistency.

The refutation also notes (correctly) that no in-tree caller exercises this path today (schema_evolution.rs and others all derive fragments from get_fragments()), which is why I'm flagging this as a nit rather than blocking. But the None branch was deliberately added by this PR with a comment explicitly calling out that "Merge may append ids beyond the original list," and the Python API is a public surface that can hit it.

How to fix

Mirror the Append/Overwrite pattern in the None arm:

None => {
    let version_meta = build_version_meta(fragment, new_version);
    fragment.last_updated_at_version_meta = version_meta.clone();
    fragment.created_at_version_meta = version_meta;
}

Either that, or drop the None arm entirely if the case is genuinely impossible — but that contradicts merge_fragments_valid and the comment justifying the branch.

}
}
}
final_fragments.extend(merged_fragments);

// Some fields that have indices may have been removed, so we should
// remove those indices as well.
Expand Down Expand Up @@ -3488,6 +3519,32 @@ fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Res
Ok(())
}

/// Returns true if Operation::Merge rewrote this fragment's column data files (Fragment::files
/// changed versus the previous manifest). Used to bump last_updated_at_version_meta only when
/// new column values were materialized to disk.
///
/// Deletion file changes alone are not treated as rewrites: tombstones remove rows but
/// survivors did not receive new column bytes; stamping last_updated for those rows would be
/// incorrect for CDF.
#[inline]
fn merge_fragment_physically_rewritten(prev: &Fragment, merged: &Fragment) -> bool {
debug_assert_eq!(prev.id, merged.id);
if prev.files.len() != merged.files.len() {
return true;
}
// Compare identity fields only. file_size_bytes is an AtomicU64 cache that
// concurrent scans can populate in place on the manifest's DataFile, so it
// must not be part of the rewrite check.
prev.files.iter().zip(merged.files.iter()).any(|(p, m)| {
p.path != m.path
|| p.fields != m.fields
|| p.column_indices != m.column_indices
|| p.file_major_version != m.file_major_version
|| p.file_minor_version != m.file_minor_version
|| p.base_id != m.base_id
})
}
Comment on lines +3522 to +3546
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 merge_fragment_physically_rewritten compares Fragment.files via Vec::PartialEq, which transitively compares CachedFileSize — an interior-mutable AtomicU64 that scheduler.rs:632 writes in place on the live manifest's atomics. A caller that constructs merged fragments by cloning from get_fragments() (which snapshots CachedFileSize) and a concurrent scan that flips the original atomic between clone-time and build_manifest can drive prev.files != merged.files to true on identical files, falsely triggering refresh_row_latest_update_meta_for_full_frag_rewrite_cols and overwriting carry-forward last_updated history. Fix: compare only the immutable identity fields (path, fields, column_indices, file_major_version, file_minor_version, base_id), or normalize CachedFileSize before comparing.

Extended reasoning...

Bug

merge_fragment_physically_rewritten in transaction.rs:3520-3531 decides whether the carry-forward path needs to refresh per-row last_updated metadata by comparing prev.files != merged.files. Vec::eq uses the derived PartialEq on DataFile (rust/lance-table/src/format/fragment.rs:27), which compares all fields including file_size_bytes: CachedFileSize.

CachedFileSize wraps an AtomicU64 (rust/lance-io/src/utils.rs:177). Critically:

  • CachedFileSize::eq loads the AtomicU64 and compares the value (utils.rs:226-231).
  • CachedFileSize::set mutates the AtomicU64 in place via Relaxed store (utils.rs:252-255), through a shared reference.
  • Clone for CachedFileSize copies a snapshot of the current value into a new AtomicU64 (utils.rs:218-224); subsequent set() on the original is not reflected in the clone.

scheduler.rs:632 calls file_size_bytes.set(size) on the caller-supplied CachedFileSize whenever a file's size becomes known. Callers in dataset/optimize.rs:516, dataset/optimize/binary_copy.rs:263, and dataset/fragment.rs:987 pass &data_file.file_size_bytes from the live dataset.manifest.fragments — i.e. they mutate the live manifest's atomic.

In build_manifest (rust/lance/src/io/commit.rs:800), current_manifest is dataset.manifest.as_ref(), so prev.files[].file_size_bytes is the live atomic. The merged fragments come from caller-side construction, which for several flows starts from a Clone of get_fragments() (a frozen snapshot).

Trigger sequence

  1. T1: caller clones the existing fragments via get_fragments() → CachedFileSize snapshot, value = 0 (unknown).
  2. T2: a concurrent scan or compaction running on the same Dataset opens those files via open_file_with_priority(..., &df.file_size_bytes) → the live atomic is set to N. The snapshot is unaffected.
  3. T3: caller commits an Operation::Merge that carries the fragment forward unchanged. build_manifest evaluates prev.files != merged.files: prev side reads N from the live atomic, merged side reads 0 from the snapshot → equality returns false despite identical path/fields/versions/base_id.
  4. T4: merge_fragment_physically_rewritten returns true → refresh_row_latest_update_meta_for_full_frag_rewrite_cols stamps every row with new_version, destroying the preserved per-row last_updated_at_version_meta history that the caller intended to carry forward.

This silently breaks CDF semantics for the carry-forward case: rows are reported as 'updated at v_new' even though no column data was actually rewritten.

Why current in-tree callers do not exercise it

The two confirmed callers — add_columns/alter_columns and drop_columns_impl — change Fragment::files length (append a new file or drop one), so Vec::eq short-circuits on length and CachedFileSize never participates. The AllNulls path does not access files, so both atomics stay at zero. The PR's own carry-forward test (merge_build_manifest_skips_refresh_when_carry_forward_stable_row_ids) constructs both DataFiles with the same Option<NonZero>=None and runs no concurrent scan, so it does not exercise the race. The vulnerability is latent: any future external caller (Python/JNI Operation::Merge construction) that submits a true carry-forward fragment whose live manifest atomic gets populated mid-flight will hit it.

Suggested fix

Compare only the immutable identity fields, e.g.:

fn data_file_identity_eq(a: &DataFile, b: &DataFile) -> bool {
    a.path == b.path
        && a.fields == b.fields
        && a.column_indices == b.column_indices
        && a.file_major_version == b.file_major_version
        && a.file_minor_version == b.file_minor_version
        && a.base_id == b.base_id
}

Then iterate prev.files / merged.files pairwise. Alternatively, normalize file_size_bytes on both sides before the Vec equality check. The current debug_assert_eq!(prev.id, merged.id) plus a length check also belong in the helper.


/// Validate that Merge operations preserve all original fragments.
/// Merge operations should only add columns or rows, not reduce fragments.
/// This ensures fragments correspond at one-to-one with the original fragment list.
Expand Down Expand Up @@ -4649,6 +4706,299 @@ mod tests {
seq.versions().collect()
}

#[test]
fn merge_build_manifest_refreshes_last_updated_when_data_files_change_stable_row_ids() {
use lance_file::version::LanceFileVersion;
use lance_table::feature_flags::FLAG_STABLE_ROW_IDS;

let (major, minor) = LanceFileVersion::Stable.to_numbers();
let mk_file = |path: &str| DataFile::new(path, vec![0], vec![0], major, minor, None, None);

let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]);
let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();

let row_ids = RowIdSequence::from([100u64, 101, 102, 103, 104].as_slice());
let row_id_meta = Some(RowIdMeta::Inline(write_row_ids(&row_ids)));

let prev_fragment = Fragment {
id: 0,
files: vec![mk_file("before.lance")],
deletion_file: None,
row_id_meta,
physical_rows: Some(5),
last_updated_at_version_meta: None,
created_at_version_meta: None,
};

let mut manifest = Manifest::new(
lance_schema.clone(),
Arc::new(vec![prev_fragment.clone()]),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
);
manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS;
manifest.next_row_id = 100;

let merged_fragment = Fragment {
files: vec![mk_file("after.lance")],
..prev_fragment
};

let tx = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![merged_fragment],
schema: lance_schema,
},
None,
);

let (out, _) = tx
.build_manifest(
Some(&manifest),
vec![],
"txn",
&ManifestWriteConfig::default(),
)
.unwrap();

assert_eq!(out.version, 2);
let frag = &out.fragments[0];
let seq = frag
.last_updated_at_version_meta
.as_ref()
.unwrap()
.load_sequence()
.unwrap();
assert_eq!(seq.version_at(0).unwrap(), 2);
assert_eq!(seq.version_at(4).unwrap(), 2);
}

#[test]
fn merge_build_manifest_skips_refresh_when_carry_forward_stable_row_ids() {
use lance_file::version::LanceFileVersion;
use lance_table::feature_flags::FLAG_STABLE_ROW_IDS;
use lance_table::rowids::version::{RowDatasetVersionMeta, RowDatasetVersionSequence};

let (major, minor) = LanceFileVersion::Stable.to_numbers();
let data_file = DataFile::new("same.lance", vec![0], vec![0], major, minor, None, None);

let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]);
let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();

let row_ids = RowIdSequence::from([200u64, 201, 202, 203, 204].as_slice());
let row_id_meta = Some(RowIdMeta::Inline(write_row_ids(&row_ids)));

let uniform_v1 = RowDatasetVersionSequence::from_uniform_row_count(5, 1);
let meta_v1 = RowDatasetVersionMeta::from_sequence(&uniform_v1).unwrap();

let prev_fragment = Fragment {
id: 0,
files: vec![data_file.clone()],
deletion_file: None,
row_id_meta: row_id_meta.clone(),
physical_rows: Some(5),
last_updated_at_version_meta: Some(meta_v1.clone()),
created_at_version_meta: None,
};

let mut manifest = Manifest::new(
lance_schema.clone(),
Arc::new(vec![prev_fragment]),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
);
manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS;
manifest.next_row_id = 100;

let merged_fragment = Fragment {
id: 0,
files: vec![data_file],
deletion_file: None,
row_id_meta,
physical_rows: Some(5),
last_updated_at_version_meta: Some(meta_v1),
created_at_version_meta: None,
};

let tx = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![merged_fragment],
schema: lance_schema,
},
None,
);

let (out, _) = tx
.build_manifest(
Some(&manifest),
vec![],
"txn",
&ManifestWriteConfig::default(),
)
.unwrap();

let seq = out.fragments[0]
.last_updated_at_version_meta
.as_ref()
.unwrap()
.load_sequence()
.unwrap();
assert_eq!(seq.version_at(0).unwrap(), 1);
assert_eq!(seq.version_at(4).unwrap(), 1);
}

#[test]
fn merge_build_manifest_no_last_updated_refresh_without_stable_row_ids() {
use lance_file::version::LanceFileVersion;
use lance_table::feature_flags::FLAG_STABLE_ROW_IDS;

let (major, minor) = LanceFileVersion::Stable.to_numbers();
let mk_file = |path: &str| DataFile::new(path, vec![0], vec![0], major, minor, None, None);

let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]);
let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();

let prev_fragment = Fragment {
id: 0,
files: vec![mk_file("before.lance")],
deletion_file: None,
row_id_meta: None,
physical_rows: Some(5),
last_updated_at_version_meta: None,
created_at_version_meta: None,
};

let manifest = Manifest::new(
lance_schema.clone(),
Arc::new(vec![prev_fragment.clone()]),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
);
assert_eq!(
manifest.reader_feature_flags & FLAG_STABLE_ROW_IDS,
0,
"manifest must not use stable row IDs for this guard test"
);

let merged_fragment = Fragment {
files: vec![mk_file("after.lance")],
..prev_fragment
};

let tx = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![merged_fragment],
schema: lance_schema,
},
None,
);

let (out, _) = tx
.build_manifest(
Some(&manifest),
vec![],
"txn",
&ManifestWriteConfig::default(),
)
.unwrap();

assert!(
out.fragments[0].last_updated_at_version_meta.is_none(),
"without stable row IDs, Merge must not populate per-row last_updated metadata"
);
}

#[test]
fn merge_build_manifest_sets_both_version_meta_for_new_fragment_id_stable_row_ids() {
use lance_file::version::LanceFileVersion;
use lance_table::feature_flags::FLAG_STABLE_ROW_IDS;

let (major, minor) = LanceFileVersion::Stable.to_numbers();
let mk_file = |path: &str| DataFile::new(path, vec![0], vec![0], major, minor, None, None);

let arrow_schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]);
let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();

// Existing fragment (id=0) with stable row IDs
let row_ids_0 = RowIdSequence::from([10u64, 11, 12].as_slice());
let existing_fragment = Fragment {
id: 0,
files: vec![mk_file("existing.lance")],
deletion_file: None,
row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&row_ids_0))),
physical_rows: Some(3),
last_updated_at_version_meta: None,
created_at_version_meta: None,
};

let mut manifest = Manifest::new(
lance_schema.clone(),
Arc::new(vec![existing_fragment.clone()]),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
);
manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS;
manifest.next_row_id = 100;
manifest.version = 1;

// New fragment (id=1) not present in prev manifest — exercises the None branch
let row_ids_1 = RowIdSequence::from([20u64, 21, 22, 23].as_slice());
let new_fragment = Fragment {
id: 1,
files: vec![mk_file("new.lance")],
deletion_file: None,
row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&row_ids_1))),
physical_rows: Some(4),
last_updated_at_version_meta: None,
created_at_version_meta: None,
};

let tx = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![existing_fragment, new_fragment],
schema: lance_schema,
},
None,
);

let (out, _) = tx
.build_manifest(
Some(&manifest),
vec![],
"txn",
&ManifestWriteConfig::default(),
)
.unwrap();

assert_eq!(out.version, 2);

let new_frag = out.fragments.iter().find(|f| f.id == 1).unwrap();

// last_updated_at_version must be set to the commit version
let last_updated_seq = new_frag
.last_updated_at_version_meta
.as_ref()
.expect("new fragment must have last_updated_at_version_meta")
.load_sequence()
.unwrap();
assert_eq!(last_updated_seq.version_at(0).unwrap(), 2);
assert_eq!(last_updated_seq.version_at(3).unwrap(), 2);

// created_at_version must also be set — must not be None
let created_seq = new_frag
.created_at_version_meta
.as_ref()
.expect("new fragment must have created_at_version_meta")
.load_sequence()
.unwrap();
assert_eq!(created_seq.version_at(0).unwrap(), 2);
assert_eq!(created_seq.version_at(3).unwrap(), 2);
}

#[test]
fn test_update_version_tracking_preserves_created_at() {
let existing_seq = RowIdSequence::from([100u64, 101, 102].as_slice());
Expand Down
Loading