Skip to content

perf!: run scheduler initialize eagerly in async read_tasks#6710

Merged
westonpace merged 4 commits into
lance-format:mainfrom
westonpace:fix-inline-scheduling
May 8, 2026
Merged

perf!: run scheduler initialize eagerly in async read_tasks#6710
westonpace merged 4 commits into
lance-format:mainfrom
westonpace:fix-inline-scheduling

Conversation

@westonpace
Copy link
Copy Markdown
Member

@westonpace westonpace commented May 7, 2026

BREAKING CHANGE: the rust file reader's read methods are now async. This is to allow the caller control over when the scheduler initialization and inline scheduling occurs so that they can parallelize this work across fragments, if appropriate.

Summary

Fix the v6→v7 inline-scheduling regression by running the decode scheduler's initialize I/O eagerly on the awaiting task instead of smuggling it into the returned stream's first poll.

This is offered as an alternative to #6709 (which reverted #6637 entirely): we keep the inline-scheduling optimization for small reads, but make the work explicit and properly parallelized across fragments.

Background

#6637 introduced an "inline scheduling" path that, for small reads, attached the scheduler future to the front of the returned stream via flatten_stream and only ran it on first poll. Combined with the per-fragment try_flatten in FilteredReadExec (rust/lance/src/io/exec/filtered_read.rs:455) and LanceScanExec — both of which poll one inner stream at a time — this serialized the scheduler's initialize I/O across fragments.

StructuralPrimitiveFieldScheduler::initialize (rust/lance-encoding/src/encodings/logical/primitive.rs:3422) does a real io.submit_request(...).await for chunk metadata. The cache is per-file (the FieldDataCacheKey is column-scoped within a file's metadata cache), so every fragment open misses. With 800 small fragments × tens of ms of S3 latency, the inline path was catastrophic.

Repro

gist — 400k rows / 800 fragments × 500 rows, KNN brute force, no index:

before fix after fix
default ~60–66 ms ~48–52 ms
LANCE_INLINE_SCHEDULING_THRESHOLD=0 (spawn) ~46 ms ~50–52 ms

Default and spawn are now matched. Cross-fragment-count ablation shows no regression at any scale (default tracks spawn ±2 ms across rpf=500/2000/8000/50000).

Approach

The goal was to make the scheduling work explicit, not "smuggled into the poll of the first batch."

  1. schedule_and_decode is now async (rust/lance-encoding/src/decoder.rs). It awaits DecodeBatchScheduler::try_new (which runs initialize) before returning. For the inline branch, it then runs the synchronous schedule_ranges / schedule_take work in line, leaving a fully primed decode stream. The non-inline branch still spawns the scheduling task so it can overlap with decoding.

  2. Cascade async through the file-reader surface. All of FileReader::read_tasks, read_range, read_ranges, read_stream, and read_stream_projected are now async. Each got a "Why is this async?" doc paragraph explaining that the decode scheduler's metadata I/O happens on the awaiting task rather than on the consumer that polls the stream.

  3. GenericFileReader trait methods return BoxFuture<'_, Result<ReadBatchTaskStream>>. V1Reader, the v2 adapter Reader, and NullReader updated. FragmentReader::{read_range, read_all, read_ranges, take_range} and new_read_impl are now async; new_read_impl uses try_join_all so per-data-file initialize I/Os run concurrently within a fragment.

  4. Callers updated in scan.rs (v1 + v2 paths), filtered_read.rs, dataset/updater.rs (Updater::try_new made async), lance-index (shufflers, distributed index merger, scalar lance_format, vector storage), benches, and python/src/file.rs.

The fix relies on the existing SpawnedTask::spawn of read_fragment in FilteredReadExec and the tokio::spawn of the open task in LanceScanExec: the per-fragment task now also drives initialize, so all fragments' scheduling I/Os run in parallel up to fragment_readahead.

Behavior change

Errors from initialize (e.g. corrupted metadata, transient I/O) now surface from the read_* await instead of from the first stream item. Existing callers that match on the result of read_* keep working; callers that previously assumed the construction was infallible and only the stream could error will now see the error one step earlier.

Test plan

  • cargo check --workspace --tests --benches — clean
  • Repro gist — regression resolved (numbers above)
  • Cross-fragment-count ablation — no regression at any scale
  • Python tests: 526 passed across test_dataset, test_scalar_index, test_blob, test_filter, test_file, test_fragment, test_vector_index (minus the timing-fragile test below)
  • Cloud / S3 verification — would expect a much larger improvement than local

One known test failure to flag

test_create_index_progress_callback_error_before_completion_propagates fails after this change. It is a pre-existing timing race exposed by the speedup, not a correctness break:

  • The test registers fail_on_tag="start:train_ivf" and expects create_index to raise.
  • Mechanism: Rust calls progress.stage_start("train_ivf").await, which only does a sync channel send — the callback's error surfaces later, when Python's block_on_pumping (python/src/executor.rs:200-247) calls pump() between 100 ms tokio::time::sleeps.
  • After this change the default-mode operation completes inside the first 100 ms slice often enough that pump doesn't get a chance to surface the error mid-flight. It hits the post-completion branch at executor.rs:238-244, which logs and ignores errors from the final pump ("Ignoring progress callback error after operation completed successfully"). Running with LANCE_INLINE_SCHEDULING_THRESHOLD=0 (spawn) makes the test pass.
  • Earlier perf commits on main may already have exposed variants of this on some platforms; commit 87ef5e2 fixed a related case.

The clean fix is in block_on_pumping (propagate the final pump's error rather than ignoring it), but that's outside this refactor's scope and changes the contract of "operation succeeded but a callback later errored". Happy to land that as a separate PR if reviewers want.

🤖 Generated with Claude Code

The previous inline-scheduling path attached the scheduler future to
the front of the returned stream via flatten_stream and only ran it
on first poll.  Combined with the per-fragment try_flatten in
FilteredReadExec / LanceScanExec (which polls one inner stream at a
time), this serialized the scheduler's initialize I/O across
fragments — catastrophic on small-fragment many-frag scans, and even
worse on cloud where each metadata fetch costs tens of ms.

Fix: make schedule_and_decode async and have it await
DecodeBatchScheduler::try_new (which performs the metadata I/O)
before returning.  The inline branch additionally runs the
synchronous schedule_ranges / schedule_take work, leaving a primed
decode stream.  The non-inline branch still spawns the scheduling
task so it can overlap with decoding.

Cascade async through the surface: FileReader::read_tasks /
read_range / read_ranges / read_stream{,_projected},
GenericFileReader::read_*_tasks (now BoxFuture<'_, ...>),
FragmentReader::{read_range, read_all, read_ranges, take_range},
new_read_impl (uses try_join_all so per-data-file initialize I/Os
run concurrently), Updater::try_new, plus all callers in scan.rs,
filtered_read.rs, lance-index, benches, and python/src/file.rs.
Public read_* methods got a "Why is this async?" doc paragraph.

On the brute-force KNN repro (400k rows, 800 fragments × 500 rows):
default-mode KNN dropped from ~60-66 ms to ~48-52 ms, matching the
spawn path.  Default and spawn are now equivalent across fragment
counts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@westonpace
Copy link
Copy Markdown
Member Author

@claude review once

Comment thread rust/lance/src/dataset/fragment.rs
Comment thread rust/lance-file/src/reader.rs
Comment thread rust/lance-encoding/src/decoder.rs
- Add missing .await on take_range calls in three fragment tests
  (test_out_of_range, test_rowid_rowaddr_only, test_fragment_take_range_deletions)
  that were broken when take_range became async.
- Make schedule_and_decode return Result<BoxStream> so initialize errors
  actually surface from the await, matching the documented contract.
  Previously errors were caught and wrapped into a one-task stream, so
  the docs added in this PR were aspirational rather than accurate.
- Update the stale "stream's first poll" comment on
  DEFAULT_INLINE_SCHEDULING_THRESHOLD to refer to the schedule_and_decode
  await, matching the rest of the inline-scheduling docs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@westonpace
Copy link
Copy Markdown
Member Author

I ran this gist on an S3 URL. Without this fix the query took ~22 seconds. With the fix it was ~1 second. If I run it on main (with the inline scheduling reverted) it takes ~1 second so performance seems to be restored.

@westonpace westonpace changed the title perf: run scheduler initialize eagerly in async read_tasks perf!: run scheduler initialize eagerly in async read_tasks May 8, 2026
@westonpace westonpace merged commit d8542b5 into lance-format:main May 8, 2026
32 checks passed
westonpace added a commit that referenced this pull request May 8, 2026
BREAKING CHANGE: the rust file reader's read methods are now async. This
is to allow the caller control over when the scheduler initialization
and inline scheduling occurs so that they can parallelize this work
across fragments, if appropriate.

Fix the v6→v7 inline-scheduling regression by running the decode
scheduler's `initialize` I/O eagerly on the awaiting task instead of
smuggling it into the returned stream's first poll.

This is offered as an alternative to #6709 (which reverted #6637
entirely): we keep the inline-scheduling optimization for small reads,
but make the work explicit and properly parallelized across fragments.

attached the scheduler future to the front of the returned stream via
`flatten_stream` and only ran it on first poll. Combined with the
per-fragment `try_flatten` in `FilteredReadExec`
(`rust/lance/src/io/exec/filtered_read.rs:455`) and `LanceScanExec` —
both of which poll one inner stream at a time — this serialized the
scheduler's `initialize` I/O across fragments.

`StructuralPrimitiveFieldScheduler::initialize`
(`rust/lance-encoding/src/encodings/logical/primitive.rs:3422`) does a
real `io.submit_request(...).await` for chunk metadata. The cache is
per-file (the `FieldDataCacheKey` is column-scoped within a file's
metadata cache), so every fragment open misses. With 800 small fragments
× tens of ms of S3 latency, the inline path was catastrophic.

[gist](https://gist.github.com/wkalt/e080fc9ddff6edd8eaee5ab50a069fbe) —
400k rows / 800 fragments × 500 rows, KNN brute force, no index:

|                       | before fix    | after fix       |
|-----------------------|---------------|-----------------|
| default               | ~60–66 ms     | **~48–52 ms**   |
| `LANCE_INLINE_SCHEDULING_THRESHOLD=0` (spawn) | ~46 ms | ~50–52 ms |

Default and spawn are now matched. Cross-fragment-count ablation shows
no regression at any scale (default tracks spawn ±2 ms across
rpf=500/2000/8000/50000).

The goal was to make the scheduling work explicit, not "smuggled into
the poll of the first batch."

1. **`schedule_and_decode` is now `async`**
(`rust/lance-encoding/src/decoder.rs`). It awaits
`DecodeBatchScheduler::try_new` (which runs `initialize`) before
returning. For the inline branch, it then runs the synchronous
`schedule_ranges` / `schedule_take` work in line, leaving a fully primed
decode stream. The non-inline branch still spawns the scheduling task so
it can overlap with decoding.

2. **Cascade async through the file-reader surface.** All of
`FileReader::read_tasks`, `read_range`, `read_ranges`, `read_stream`,
and `read_stream_projected` are now `async`. Each got a "Why is this
async?" doc paragraph explaining that the decode scheduler's metadata
I/O happens on the awaiting task rather than on the consumer that polls
the stream.

3. **`GenericFileReader` trait methods return `BoxFuture<'_,
Result<ReadBatchTaskStream>>`.** V1Reader, the v2 adapter `Reader`, and
`NullReader` updated. `FragmentReader::{read_range, read_all,
read_ranges, take_range}` and `new_read_impl` are now async;
`new_read_impl` uses `try_join_all` so per-data-file `initialize` I/Os
run concurrently within a fragment.

4. **Callers updated** in `scan.rs` (v1 + v2 paths), `filtered_read.rs`,
`dataset/updater.rs` (`Updater::try_new` made async), `lance-index`
(shufflers, distributed index merger, scalar lance_format, vector
storage), benches, and `python/src/file.rs`.

The fix relies on the existing `SpawnedTask::spawn` of `read_fragment`
in `FilteredReadExec` and the `tokio::spawn` of the open task in
`LanceScanExec`: the per-fragment task now also drives `initialize`, so
all fragments' scheduling I/Os run in parallel up to
`fragment_readahead`.

Errors from `initialize` (e.g. corrupted metadata, transient I/O) now
surface from the `read_*` await instead of from the first stream item.
Existing callers that match on the result of `read_*` keep working;
callers that previously assumed the construction was infallible and only
the stream could error will now see the error one step earlier.

- [x] `cargo check --workspace --tests --benches` — clean
- [x] Repro gist — regression resolved (numbers above)
- [x] Cross-fragment-count ablation — no regression at any scale
- [x] Python tests: 526 passed across `test_dataset`,
`test_scalar_index`, `test_blob`, `test_filter`, `test_file`,
`test_fragment`, `test_vector_index` (minus the timing-fragile test
below)
- [ ] Cloud / S3 verification — would expect a much larger improvement
than local

`test_create_index_progress_callback_error_before_completion_propagates`
fails after this change. It is a **pre-existing timing race exposed by
the speedup**, not a correctness break:

- The test registers `fail_on_tag="start:train_ivf"` and expects
`create_index` to raise.
- Mechanism: Rust calls `progress.stage_start("train_ivf").await`, which
only does a sync channel send — the callback's error surfaces later,
when Python's `block_on_pumping` (`python/src/executor.rs:200-247`)
calls `pump()` between 100 ms `tokio::time::sleep`s.
- After this change the default-mode operation completes inside the
first 100 ms slice often enough that pump doesn't get a chance to
surface the error mid-flight. It hits the post-completion branch at
`executor.rs:238-244`, which logs and ignores errors from the final pump
(`"Ignoring progress callback error after operation completed
successfully"`). Running with `LANCE_INLINE_SCHEDULING_THRESHOLD=0`
(spawn) makes the test pass.
- Earlier perf commits on main may already have exposed variants of this
on some platforms; commit 87ef5e2 fixed a related case.

The clean fix is in `block_on_pumping` (propagate the final pump's error
rather than ignoring it), but that's outside this refactor's scope and
changes the contract of "operation succeeded but a callback later
errored". Happy to land that as a separate PR if reviewers want.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
sinianluoye pushed a commit to TheDeltaLab/lance that referenced this pull request May 9, 2026
Upstream added a required query_parallelism field to lance_index::vector::Query (lance-format#6710 chain).
Update the local tracing tests to set the field to 0.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants