perf: revert inline scheduling#6709
Merged
Merged
Conversation
…ll reads (lance-format#6637)" This reverts commit 6e40d78.
wkalt
approved these changes
May 7, 2026
Contributor
wkalt
left a comment
There was a problem hiding this comment.
approved pending CI. Thanks.
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
5 tasks
westonpace
added a commit
that referenced
this pull request
May 8, 2026
BREAKING CHANGE: the rust file reader's read methods are now async. This is to allow the caller control over when the scheduler initialization and inline scheduling occurs so that they can parallelize this work across fragments, if appropriate. ## Summary Fix the v6→v7 inline-scheduling regression by running the decode scheduler's `initialize` I/O eagerly on the awaiting task instead of smuggling it into the returned stream's first poll. This is offered as an alternative to #6709 (which reverted #6637 entirely): we keep the inline-scheduling optimization for small reads, but make the work explicit and properly parallelized across fragments. ## Background #6637 introduced an "inline scheduling" path that, for small reads, attached the scheduler future to the front of the returned stream via `flatten_stream` and only ran it on first poll. Combined with the per-fragment `try_flatten` in `FilteredReadExec` (`rust/lance/src/io/exec/filtered_read.rs:455`) and `LanceScanExec` — both of which poll one inner stream at a time — this serialized the scheduler's `initialize` I/O across fragments. `StructuralPrimitiveFieldScheduler::initialize` (`rust/lance-encoding/src/encodings/logical/primitive.rs:3422`) does a real `io.submit_request(...).await` for chunk metadata. The cache is per-file (the `FieldDataCacheKey` is column-scoped within a file's metadata cache), so every fragment open misses. With 800 small fragments × tens of ms of S3 latency, the inline path was catastrophic. ## Repro [gist](https://gist.github.com/wkalt/e080fc9ddff6edd8eaee5ab50a069fbe) — 400k rows / 800 fragments × 500 rows, KNN brute force, no index: | | before fix | after fix | |-----------------------|---------------|-----------------| | default | ~60–66 ms | **~48–52 ms** | | `LANCE_INLINE_SCHEDULING_THRESHOLD=0` (spawn) | ~46 ms | ~50–52 ms | Default and spawn are now matched. Cross-fragment-count ablation shows no regression at any scale (default tracks spawn ±2 ms across rpf=500/2000/8000/50000). ## Approach The goal was to make the scheduling work explicit, not "smuggled into the poll of the first batch." 1. **`schedule_and_decode` is now `async`** (`rust/lance-encoding/src/decoder.rs`). It awaits `DecodeBatchScheduler::try_new` (which runs `initialize`) before returning. For the inline branch, it then runs the synchronous `schedule_ranges` / `schedule_take` work in line, leaving a fully primed decode stream. The non-inline branch still spawns the scheduling task so it can overlap with decoding. 2. **Cascade async through the file-reader surface.** All of `FileReader::read_tasks`, `read_range`, `read_ranges`, `read_stream`, and `read_stream_projected` are now `async`. Each got a "Why is this async?" doc paragraph explaining that the decode scheduler's metadata I/O happens on the awaiting task rather than on the consumer that polls the stream. 3. **`GenericFileReader` trait methods return `BoxFuture<'_, Result<ReadBatchTaskStream>>`.** V1Reader, the v2 adapter `Reader`, and `NullReader` updated. `FragmentReader::{read_range, read_all, read_ranges, take_range}` and `new_read_impl` are now async; `new_read_impl` uses `try_join_all` so per-data-file `initialize` I/Os run concurrently within a fragment. 4. **Callers updated** in `scan.rs` (v1 + v2 paths), `filtered_read.rs`, `dataset/updater.rs` (`Updater::try_new` made async), `lance-index` (shufflers, distributed index merger, scalar lance_format, vector storage), benches, and `python/src/file.rs`. The fix relies on the existing `SpawnedTask::spawn` of `read_fragment` in `FilteredReadExec` and the `tokio::spawn` of the open task in `LanceScanExec`: the per-fragment task now also drives `initialize`, so all fragments' scheduling I/Os run in parallel up to `fragment_readahead`. ## Behavior change Errors from `initialize` (e.g. corrupted metadata, transient I/O) now surface from the `read_*` await instead of from the first stream item. Existing callers that match on the result of `read_*` keep working; callers that previously assumed the construction was infallible and only the stream could error will now see the error one step earlier. ## Test plan - [x] `cargo check --workspace --tests --benches` — clean - [x] Repro gist — regression resolved (numbers above) - [x] Cross-fragment-count ablation — no regression at any scale - [x] Python tests: 526 passed across `test_dataset`, `test_scalar_index`, `test_blob`, `test_filter`, `test_file`, `test_fragment`, `test_vector_index` (minus the timing-fragile test below) - [ ] Cloud / S3 verification — would expect a much larger improvement than local ## One known test failure to flag `test_create_index_progress_callback_error_before_completion_propagates` fails after this change. It is a **pre-existing timing race exposed by the speedup**, not a correctness break: - The test registers `fail_on_tag="start:train_ivf"` and expects `create_index` to raise. - Mechanism: Rust calls `progress.stage_start("train_ivf").await`, which only does a sync channel send — the callback's error surfaces later, when Python's `block_on_pumping` (`python/src/executor.rs:200-247`) calls `pump()` between 100 ms `tokio::time::sleep`s. - After this change the default-mode operation completes inside the first 100 ms slice often enough that pump doesn't get a chance to surface the error mid-flight. It hits the post-completion branch at `executor.rs:238-244`, which logs and ignores errors from the final pump (`"Ignoring progress callback error after operation completed successfully"`). Running with `LANCE_INLINE_SCHEDULING_THRESHOLD=0` (spawn) makes the test pass. - Earlier perf commits on main may already have exposed variants of this on some platforms; commit 87ef5e2 fixed a related case. The clean fix is in `block_on_pumping` (propagate the final pump's error rather than ignoring it), but that's outside this refactor's scope and changes the contract of "operation succeeded but a callback later errored". Happy to land that as a separate PR if reviewers want. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
westonpace
added a commit
that referenced
this pull request
May 8, 2026
This reverts commit 6e40d78. We detected some cases where this has a pretty significant performance regression and we need to understand it more before proceeding.
westonpace
added a commit
that referenced
this pull request
May 8, 2026
BREAKING CHANGE: the rust file reader's read methods are now async. This is to allow the caller control over when the scheduler initialization and inline scheduling occurs so that they can parallelize this work across fragments, if appropriate. Fix the v6→v7 inline-scheduling regression by running the decode scheduler's `initialize` I/O eagerly on the awaiting task instead of smuggling it into the returned stream's first poll. This is offered as an alternative to #6709 (which reverted #6637 entirely): we keep the inline-scheduling optimization for small reads, but make the work explicit and properly parallelized across fragments. attached the scheduler future to the front of the returned stream via `flatten_stream` and only ran it on first poll. Combined with the per-fragment `try_flatten` in `FilteredReadExec` (`rust/lance/src/io/exec/filtered_read.rs:455`) and `LanceScanExec` — both of which poll one inner stream at a time — this serialized the scheduler's `initialize` I/O across fragments. `StructuralPrimitiveFieldScheduler::initialize` (`rust/lance-encoding/src/encodings/logical/primitive.rs:3422`) does a real `io.submit_request(...).await` for chunk metadata. The cache is per-file (the `FieldDataCacheKey` is column-scoped within a file's metadata cache), so every fragment open misses. With 800 small fragments × tens of ms of S3 latency, the inline path was catastrophic. [gist](https://gist.github.com/wkalt/e080fc9ddff6edd8eaee5ab50a069fbe) — 400k rows / 800 fragments × 500 rows, KNN brute force, no index: | | before fix | after fix | |-----------------------|---------------|-----------------| | default | ~60–66 ms | **~48–52 ms** | | `LANCE_INLINE_SCHEDULING_THRESHOLD=0` (spawn) | ~46 ms | ~50–52 ms | Default and spawn are now matched. Cross-fragment-count ablation shows no regression at any scale (default tracks spawn ±2 ms across rpf=500/2000/8000/50000). The goal was to make the scheduling work explicit, not "smuggled into the poll of the first batch." 1. **`schedule_and_decode` is now `async`** (`rust/lance-encoding/src/decoder.rs`). It awaits `DecodeBatchScheduler::try_new` (which runs `initialize`) before returning. For the inline branch, it then runs the synchronous `schedule_ranges` / `schedule_take` work in line, leaving a fully primed decode stream. The non-inline branch still spawns the scheduling task so it can overlap with decoding. 2. **Cascade async through the file-reader surface.** All of `FileReader::read_tasks`, `read_range`, `read_ranges`, `read_stream`, and `read_stream_projected` are now `async`. Each got a "Why is this async?" doc paragraph explaining that the decode scheduler's metadata I/O happens on the awaiting task rather than on the consumer that polls the stream. 3. **`GenericFileReader` trait methods return `BoxFuture<'_, Result<ReadBatchTaskStream>>`.** V1Reader, the v2 adapter `Reader`, and `NullReader` updated. `FragmentReader::{read_range, read_all, read_ranges, take_range}` and `new_read_impl` are now async; `new_read_impl` uses `try_join_all` so per-data-file `initialize` I/Os run concurrently within a fragment. 4. **Callers updated** in `scan.rs` (v1 + v2 paths), `filtered_read.rs`, `dataset/updater.rs` (`Updater::try_new` made async), `lance-index` (shufflers, distributed index merger, scalar lance_format, vector storage), benches, and `python/src/file.rs`. The fix relies on the existing `SpawnedTask::spawn` of `read_fragment` in `FilteredReadExec` and the `tokio::spawn` of the open task in `LanceScanExec`: the per-fragment task now also drives `initialize`, so all fragments' scheduling I/Os run in parallel up to `fragment_readahead`. Errors from `initialize` (e.g. corrupted metadata, transient I/O) now surface from the `read_*` await instead of from the first stream item. Existing callers that match on the result of `read_*` keep working; callers that previously assumed the construction was infallible and only the stream could error will now see the error one step earlier. - [x] `cargo check --workspace --tests --benches` — clean - [x] Repro gist — regression resolved (numbers above) - [x] Cross-fragment-count ablation — no regression at any scale - [x] Python tests: 526 passed across `test_dataset`, `test_scalar_index`, `test_blob`, `test_filter`, `test_file`, `test_fragment`, `test_vector_index` (minus the timing-fragile test below) - [ ] Cloud / S3 verification — would expect a much larger improvement than local `test_create_index_progress_callback_error_before_completion_propagates` fails after this change. It is a **pre-existing timing race exposed by the speedup**, not a correctness break: - The test registers `fail_on_tag="start:train_ivf"` and expects `create_index` to raise. - Mechanism: Rust calls `progress.stage_start("train_ivf").await`, which only does a sync channel send — the callback's error surfaces later, when Python's `block_on_pumping` (`python/src/executor.rs:200-247`) calls `pump()` between 100 ms `tokio::time::sleep`s. - After this change the default-mode operation completes inside the first 100 ms slice often enough that pump doesn't get a chance to surface the error mid-flight. It hits the post-completion branch at `executor.rs:238-244`, which logs and ignores errors from the final pump (`"Ignoring progress callback error after operation completed successfully"`). Running with `LANCE_INLINE_SCHEDULING_THRESHOLD=0` (spawn) makes the test pass. - Earlier perf commits on main may already have exposed variants of this on some platforms; commit 87ef5e2 fixed a related case. The clean fix is in `block_on_pumping` (propagate the final pump's error rather than ignoring it), but that's outside this refactor's scope and changes the contract of "operation succeeded but a callback later errored". Happy to land that as a separate PR if reviewers want. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This reverts commit 6e40d78.
We detected some cases where this has a pretty significant performance regression and we need to understand it more before proceeding.