Summary
Add a distributed / parallel delete to lance: partition the target by fragment, delete a predicate's matches per fragment slice independently (in parallel), and batch-commit the per-slice transactions as one atomic commit. This completes the documented CommitBuilder::execute_batch API — previously append-only, with an in-code TODO ("Only works for append transactions right now. Other kinds of transactions will be supported in the future.") — for delete transactions.
Branch: feat/distributed-delete · PR #7 · Commit 464e8e881
This is the sibling of the fragment-scoped merge work (#6). Delete is the pure form of the same pattern (fragment-scope + combine + batch-commit): Operation::Delete has 3 fields vs Update's 9, writes no new fragments (only tombstones), and is already per-fragment in OSS. It is the smallest correct slice that extends execute_batch beyond Append, so it is the best first upstreamable piece.
Problem
A large delete (DELETE FROM t WHERE <pred> over a 10B-row table) run distributed today has every task scan the whole target to find matching rows → aggregate I/O O(N × target), the same pathology that OOMs the distributed merge (#6). And there is no way to commit N independent deletes as one atomic version: execute_batch rejects any non-Append transaction.
Design
Two independent, additive pieces (None/all-append = today's behavior, unchanged):
Producer — DeleteBuilder::with_target_fragments(ids)
Scopes the scan (via Scanner::with_fragments) and deletion application to a fragment subset, so each task reads only its own slice. Unknown or duplicate ids are rejected. Aggregate target I/O becomes O(target).
Combiner — combine_delete_transactions + execute_batch Delete arm
The driver stitches the per-slice Operation::Deletes into one. Because with_target_fragments requires disjoint slices, each fragment is touched by exactly one task, so the combiner never reads or rewrites deletion files — it concatenates updated_fragments + deleted_fragment_ids, requires a shared predicate + read_version, and rejects any fragment id seen in more than one transaction (covers whole-fragment overlaps too). It returns CombinedDelete { transaction, affected_rows, num_deleted_rows }; execute_batch carries affected_rows into the commit so a concurrent writer is rebased at row granularity, exactly like a single DeleteBuilder::execute.
Why this is correct
Disjoint fragment slices can never tombstone the same physical row ⇒ every matching row is covered exactly once ⇒ the combined result is exactly equal to a single full delete of the same predicate over the whole dataset. The disjointness check backstops the invariant and errors if a caller ever passes overlapping slices.
API
// per task (parallel): scope the delete to a disjoint fragment slice
let staged = DeleteBuilder::new(ds.clone(), "age > 65")
.with_target_fragments(vec![3, 7, 11])
.execute_uncommitted()
.await?;
// driver: combine + commit all slices as one atomic version
CommitBuilder::new(ds).execute_batch(all_txns).await?;
Tests (all multi-fragment)
test_fragment_scoped_delete_equals_full_delete — split delete ≡ full delete (identical surviving rows + num_deleted_rows + affected_rows); each slice's txn touches only its own fragments.
test_fragment_scoped_delete_deletes_whole_fragment — whole-fragment delete lands in deleted_fragment_ids.
test_fragment_scoped_delete_empty_slice — slice matching nothing contributes an empty delete.
test_batch_delete_rebases_against_concurrent_delete — batch delete rebases against a concurrent delete of another fragment (proves affected_rows is carried).
test_fragment_scoped_delete_with_scalar_index — scalar index stays consistent (asserted via index_statistics).
- Guards: unknown-id, duplicate-id, overlapping-partial, overlapping-whole-fragment, predicate-mismatch, mixed-batch, delete-path timeout.
Benchmark
rust/lance/benches/fragment_scoped_delete.rs — one round of 8 tasks deleting id % 10 = 0 from a 320K-row / 64-fragment target, fresh dataset per iteration (single machine, isolates the scan cost):
| mode |
time / round |
full_scan_per_task (each task scans the whole target) |
~84.5 ms |
fragment_scoped_per_task (each task scans only its slice) |
~14.6 ms |
| speedup |
~5.8× |
The speedup scales with N — it is the O(N × target) → O(target) collapse.
Follow-ups (out of scope here)
- Wire
target_fragments through JNI → Java → a lance-spark distributed-delete exec (enumerate target fragments on the driver, assign slices to tasks).
- Optionally surface aggregate
num_deleted_rows on BatchCommitResult (currently exposed via CombinedDelete; kept off BatchCommitResult to avoid a public-API/FFI change).
Summary
Add a distributed / parallel delete to lance: partition the target by fragment, delete a predicate's matches per fragment slice independently (in parallel), and batch-commit the per-slice transactions as one atomic commit. This completes the documented
CommitBuilder::execute_batchAPI — previously append-only, with an in-code TODO ("Only works for append transactions right now. Other kinds of transactions will be supported in the future.") — for delete transactions.Branch:
feat/distributed-delete· PR #7 · Commit464e8e881This is the sibling of the fragment-scoped merge work (#6). Delete is the pure form of the same pattern (fragment-scope + combine + batch-commit):
Operation::Deletehas 3 fields vsUpdate's 9, writes no new fragments (only tombstones), and is already per-fragment in OSS. It is the smallest correct slice that extendsexecute_batchbeyondAppend, so it is the best first upstreamable piece.Problem
A large delete (
DELETE FROM t WHERE <pred>over a 10B-row table) run distributed today has every task scan the whole target to find matching rows → aggregate I/OO(N × target), the same pathology that OOMs the distributed merge (#6). And there is no way to commit N independent deletes as one atomic version:execute_batchrejects any non-Appendtransaction.Design
Two independent, additive pieces (
None/all-append = today's behavior, unchanged):Producer —
DeleteBuilder::with_target_fragments(ids)Scopes the scan (via
Scanner::with_fragments) and deletion application to a fragment subset, so each task reads only its own slice. Unknown or duplicate ids are rejected. Aggregate target I/O becomesO(target).Combiner —
combine_delete_transactions+execute_batchDelete armThe driver stitches the per-slice
Operation::Deletes into one. Becausewith_target_fragmentsrequires disjoint slices, each fragment is touched by exactly one task, so the combiner never reads or rewrites deletion files — it concatenatesupdated_fragments+deleted_fragment_ids, requires a sharedpredicate+read_version, and rejects any fragment id seen in more than one transaction (covers whole-fragment overlaps too). It returnsCombinedDelete { transaction, affected_rows, num_deleted_rows };execute_batchcarriesaffected_rowsinto the commit so a concurrent writer is rebased at row granularity, exactly like a singleDeleteBuilder::execute.Why this is correct
Disjoint fragment slices can never tombstone the same physical row ⇒ every matching row is covered exactly once ⇒ the combined result is exactly equal to a single full delete of the same predicate over the whole dataset. The disjointness check backstops the invariant and errors if a caller ever passes overlapping slices.
API
Tests (all multi-fragment)
test_fragment_scoped_delete_equals_full_delete— split delete ≡ full delete (identical surviving rows +num_deleted_rows+affected_rows); each slice's txn touches only its own fragments.test_fragment_scoped_delete_deletes_whole_fragment— whole-fragment delete lands indeleted_fragment_ids.test_fragment_scoped_delete_empty_slice— slice matching nothing contributes an empty delete.test_batch_delete_rebases_against_concurrent_delete— batch delete rebases against a concurrent delete of another fragment (provesaffected_rowsis carried).test_fragment_scoped_delete_with_scalar_index— scalar index stays consistent (asserted viaindex_statistics).Benchmark
rust/lance/benches/fragment_scoped_delete.rs— one round of 8 tasks deletingid % 10 = 0from a 320K-row / 64-fragment target, fresh dataset per iteration (single machine, isolates the scan cost):full_scan_per_task(each task scans the whole target)fragment_scoped_per_task(each task scans only its slice)The speedup scales with N — it is the
O(N × target) → O(target)collapse.Follow-ups (out of scope here)
target_fragmentsthrough JNI → Java → a lance-spark distributed-delete exec (enumerate target fragments on the driver, assign slices to tasks).num_deleted_rowsonBatchCommitResult(currently exposed viaCombinedDelete; kept offBatchCommitResultto avoid a public-API/FFI change).