Skip to content

feat(delete): distributed fragment-scoped delete + execute_batch delete support#7

Open
sezruby wants to merge 2 commits into
base/distributed-deletefrom
feat/distributed-delete
Open

feat(delete): distributed fragment-scoped delete + execute_batch delete support#7
sezruby wants to merge 2 commits into
base/distributed-deletefrom
feat/distributed-delete

Conversation

@sezruby

@sezruby sezruby commented Jul 3, 2026

Copy link
Copy Markdown
Owner

Summary

Adds a distributed / parallel delete to lance: partition the target by fragment, delete a predicate's matches per fragment slice independently (in parallel), and batch-commit the per-slice transactions as one atomic commit. Completes the documented CommitBuilder::execute_batch API — previously append-only, with an in-code TODO — for delete transactions.

Tracking issue: #8 · sibling of the fragment-scoped merge work #6.

Review note: opened against base/distributed-delete (a branch pinned at the upstream/main commit 3a914d923 this work was cut from) rather than the fork's stale main, so the diff shows only these commits. Intended to eventually target lance-format/lance main.

Two independent pieces

Producer — DeleteBuilder::with_target_fragments(ids)

  • Scopes the scan (via Scanner::with_fragments) and deletion application to a fragment subset, so each task reads only its own slice instead of re-scanning the whole dataset (O(N × target)O(target) aggregate I/O).
  • Rejects unknown and duplicate fragment ids.
  • The regular-predicate path needs no change: a scoped scan only yields row addresses in-slice, so apply_deletions naturally stays in-slice. Only the predicate == true fast-path is explicitly scoped.

Combiner — combine_delete_transactions + execute_batch Delete arm

  • Because with_target_fragments requires disjoint slices, each fragment is touched by exactly one task — so the combiner never reads or rewrites deletion files. It concatenates updated_fragments + deleted_fragment_ids, requires all inputs to share the same predicate + read_version, and rejects any fragment id seen in more than one transaction (covers whole-fragment overlaps, not just partial updates).
  • Returns CombinedDelete { transaction, affected_rows, num_deleted_rows }. execute_batch carries the reconstructed affected_rows into the commit, so a concurrent writer is rebased at row granularity — exactly like a single DeleteBuilder::execute. affected_rows is the delete delta vs read_version (each touched fragment's post-delete deletion vector minus its state at read_version, or all live rows for a whole-fragment delete); its cardinality is the aggregate num_deleted_rows.
  • execute_batch now accepts an all-delete batch; all-append keeps its existing fast path; mixed / other-kind batches are rejected. The configured with_timeout bounds the entire call, including the delete path's dataset resolution + deletion-file reads.

Correctness

Disjoint fragment slices can never tombstone the same physical row ⇒ every matching row is covered exactly once ⇒ the combined result is exactly equal to a single full delete of the same predicate over the whole dataset. The disjointness check backstops the invariant and errors if a caller ever passes overlapping slices.

Multi-base-safe: the affected_rows reconstruction reads via the base-aware read_dataset_deletion_file and writes no deletion files, verified against the recently-merged upstream multi-base PRs (lance-format/lance lance-format#7610, lance-format#7609, lance-format#7608, lance-format#4580).

Tests (all multi-fragment)

  • test_fragment_scoped_delete_equals_full_delete — split delete ≡ full delete (identical surviving rows, and combiner num_deleted_rows + affected_rows match a single full delete); each slice's txn touches only its own fragments.
  • test_fragment_scoped_delete_deletes_whole_fragment — whole-fragment delete lands in deleted_fragment_ids.
  • test_fragment_scoped_delete_empty_slice — a slice matching nothing contributes an empty delete.
  • test_batch_delete_rebases_against_concurrent_delete — batch delete rebases against a concurrent delete of a different fragment (proves affected_rows is carried).
  • test_fragment_scoped_delete_with_scalar_index — scalar-index consistency asserted via index_statistics (not just a full-scan-fallback count).
  • Guards, each asserting the error message (not just the variant): unknown id, duplicate id, overlapping-partial, overlapping-whole-fragment, predicate mismatch, mixed batch, delete-path timeout.

Benchmark

benches/fragment_scoped_delete.rs (full_scan_per_task vs fragment_scoped_per_task, framed as aggregate work across N tasks; fresh dataset per iteration so uncommitted deletion files don't accumulate and bias the modes). A 64-fragment / 8-task single-node round: ~5.8× (≈84.5 ms → ≈14.6 ms).

Verification

  • cargo fmt --all
  • cargo clippy -p lance --tests --benches -- -D warnings → 0 warnings ✅
  • cargo test -p lance --lib dataset::write:: → 268 passed, 0 failed ✅
  • doctests (incl. with_target_fragments example) → pass ✅

Commits

  1. feat(delete): distributed fragment-scoped delete + execute_batch delete support — initial implementation.
  2. fix(delete): address review — row-level rebase, disjointness, timeout, bench — applies an xhigh code review: row-level rebase via reconstructed affected_rows, disjointness-based combiner (no DV union/rewrite), duplicate-id rejection, whole-batch timeout, unbiased bench, stronger tests, Python docstring.

🤖 Generated with Claude Code

sezruby and others added 2 commits July 3, 2026 13:13
…te support

Add a distributed / parallel delete: partition the target by fragment, delete a
predicate's matches per fragment slice independently, and batch-commit the
per-slice transactions as one atomic commit. This completes the documented
`execute_batch` API (previously append-only) for delete transactions.

Two independent pieces:

- Producer: `DeleteBuilder::with_target_fragments(ids)` scopes the scan (via
  `Scanner::with_fragments`) and deletion application to a fragment subset, so
  each distributed task reads only its own slice of the target instead of
  re-scanning the whole dataset. Unknown fragment ids are rejected.
- Combiner: `combine_delete_transactions` unions the per-slice deletion vectors
  (one merged deletion file per shared fragment, data files untouched), dedups
  whole-fragment deletions, and requires a shared predicate + read_version.
  `CommitBuilder::execute_batch` now accepts an all-delete batch and combines it;
  mixed-kind batches are still rejected.

Because disjoint fragment slices can never tombstone the same physical row, the
batch commit is conflict-free and equivalent to a single full delete of the same
predicate over the whole dataset. An overlap check backstops the invariant.

Tests cover equivalence-vs-full-delete, whole-fragment deletion, empty slices,
unknown-id rejection, overlap/predicate-mismatch rejection, mixed-batch
rejection, and scalar-index consistency; all multi-fragment. Adds
`benches/fragment_scoped_delete.rs` (aggregate-work-across-N-tasks framing;
~6x on a 64-fragment / 8-task single-node round).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…, bench

Applies the code-review findings on the distributed delete:

- combine_delete_transactions now returns a CombinedDelete { transaction,
  affected_rows, num_deleted_rows }. execute_batch_delete carries the
  reconstructed affected_rows into the commit so a concurrent writer is rebased
  at row granularity, matching a single DeleteBuilder::execute (previously the
  batch committed with affected_rows=None and hard-failed under concurrency).
  affected_rows is the delete delta vs read_version, computed by diffing each
  touched fragment's post-delete deletion vector against its state at
  read_version; its cardinality gives the aggregate num_deleted_rows.
- Since with_target_fragments requires disjoint slices, the combiner no longer
  reads/unions deletion vectors or writes new deletion files. It concatenates
  updated_fragments + deleted_fragment_ids and rejects any fragment id seen in
  more than one transaction — covering whole-fragment overlaps too, which the
  previous union-only check silently deduped.
- with_target_fragments rejects duplicate ids (previously double-counted rows
  and emitted duplicate deleted_fragment_ids on the delete-everything path).
- execute_batch wraps the whole call (including the delete path's dataset
  resolution + deletion-file reads) in the commit timeout; the arms call
  execute_inner to avoid double-wrapping. Zero-timeout rejected up front.
- Bench: fresh dataset per iteration via iter_batched so uncommitted deletion
  files don't accumulate in the store and bias full-scan (which writes N× more)
  vs scoped. Corrected single-node round: ~5.8x.
- Python commit_batch docstring updated (append OR delete, not append-only).
- Tests: overlapping-whole-fragment rejection, duplicate-id rejection,
  concurrent-delete rebase, num_deleted_rows/affected_rows equivalence vs a full
  delete, scalar-index consistency via index_statistics, delete-path timeout;
  negative tests now assert the error message, not just the InvalidInput variant.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sezruby

sezruby commented Jul 4, 2026

Copy link
Copy Markdown
Owner Author

Review fixes applied (commit 464e8e8)

Addressed all findings from the xhigh code review. Summary:

Correctness (P0/P1)

  • Row-level rebase (was the headline bug): combine_delete_transactions now returns CombinedDelete { transaction, affected_rows, num_deleted_rows }, and execute_batch_delete threads affected_rows into the commit. A distributed delete now rebases against a concurrent writer at row granularity, exactly like a single DeleteBuilder::execute — previously it committed with affected_rows = None and hard-failed under concurrency. affected_rows is reconstructed as the delete delta vs read_version. New test test_batch_delete_rebases_against_concurrent_delete proves it.
  • Disjointness / whole-fragment overlap: because with_target_fragments requires disjoint slices, the combiner no longer reads/unions deletion vectors or writes new deletion files — it concatenates fragment lists and rejects any fragment id seen twice, covering whole-fragment overlaps too (the old union-only check silently deduped those). Removed the is_disjoint/full-DV-compare concern and the library .unwrap()s along with that branch.
  • Duplicate ids: with_target_fragments now rejects repeated ids (previously double-counted rows on the delete-everything path).
  • Timeout: execute_batch wraps the entire call — including the delete path's dataset resolution + deletion-file reads — in the commit timeout. Test extended to assert a delete-batch also times out.

Bench

  • Fresh dataset per iteration via iter_batched, so uncommitted deletion files no longer accumulate in the store and bias full-scan (writes N× more files) vs scoped. Corrected single-node round: ~5.8× (84.5 ms → 14.6 ms).

Bindings / tests

  • Python commit_batch docstring updated (append or delete, not append-only).
  • Negative tests now assert the error message, not just the InvalidInput variant; scalar-index test asserts index consistency via index_statistics (was passing via full-scan fallback); equivalence test asserts num_deleted_rows + affected_rows match a single full delete.

Grounding: verified the deletion-file base placement against the recently-merged upstream multi-base PRs (lance-format#7610, lance-format#7609, lance-format#7608, lance-format#4580) — the combiner's delta reconstruction reads via read_dataset_deletion_file (base-aware) and writes nothing, so it's multi-base-safe.

Gate: cargo fmt + clippy --tests --benches -D warnings (0 warnings) + cargo test -p lance --lib dataset::write:: (268 passed) + doctests (3 passed), all green.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant