[Data] Add SchedulingHints sibling field on BlockEntry / wire envelope#64517
Open
goutamvenkat-anyscale wants to merge 1 commit into
Open
[Data] Add SchedulingHints sibling field on BlockEntry / wire envelope#64517goutamvenkat-anyscale wants to merge 1 commit into
goutamvenkat-anyscale wants to merge 1 commit into
Conversation
Adds an opt-in, producer-supplied forecast about what the *next consumer*
of a block will need (memory today; cpu/gpu/locality/strategy in future
additions on the same dataclass). Designed to be layered on top of the
BlockEntry foundation without disturbing existing call sites.
Surface:
- New module `ray.data._internal.scheduling_hints` with:
* `SchedulingHints` frozen dataclass (memory: Optional[int]; additive)
* `stage_scheduling_hints(hints)` — producer helper; writes to
``TaskContext.next_block_scheduling_hints`` before each yield.
* `stage_memory_hint(memory)` — single-axis convenience.
- `BlockEntry.scheduling_hints: Optional[SchedulingHints]` — sibling
field to `metadata`, defaulting to None.
- `BlockMetadataWithSchema.scheduling_hints` — wire-envelope field that
carries the hint from worker to driver alongside the per-block
metadata. `from_metadata` / `from_block` thread it through.
- `TaskContext.next_block_scheduling_hints` +
`consume_next_block_scheduling_hints()` — staging slot consumed and
cleared by `_map_task` after each yield, so a stale value can't
silently mis-tag later blocks.
- `RefBundle.scheduling_hints` accessor — parallel list to
`block_refs` / `metadata`, returns the per-block forecasts.
Plumbing:
- `_map_task` reads the staged hints, attaches them to the
``BlockMetadataWithSchema`` it pickles per yield.
- `PhysicalOperator` driver-side bundle assembly lifts hints from BMWS
into `BlockEntry.scheduling_hints` so consumers see them via the
bundle's accessor.
This PR ships the infra only — no operator currently stages or reads
hints. The Download operator wires producer (file-size totals → memory
forecast) and consumer (bundle hint sum → per-task `memory` resource)
in a follow-on PR.
Tests:
- `test_scheduling_hints.py` covers dataclass behavior, staging
helpers, TaskContext consume/clear, and BMWS pickle round-trip.
- `test_ref_bundle.py` gains BlockEntry hint-field tests and the
parallel-list accessor.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Contributor
There was a problem hiding this comment.
Code Review
This pull request introduces the infrastructure for pre-scheduling hints, allowing producers to stage prospective resource forecasts (such as memory requirements) for downstream consumer tasks. The changes include adding a new SchedulingHints dataclass, updating TaskContext to stage and consume these hints, carrying them on the wire envelope via BlockMetadataWithSchema and BlockEntry, and integrating them into the map operator's block yielding process. Comprehensive unit tests have also been added to verify the staging, consumption, and serialization of these hints. I have no feedback to provide.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds an opt-in, producer-supplied forecast about what the next consumer of a block will need (memory today; cpu/gpu/locality/strategy in future additions on the same dataclass). Designed to be layered on top of the BlockEntry foundation without disturbing existing call sites.
Surface:
ray.data._internal.scheduling_hintswith:SchedulingHintsfrozen dataclass (memory: Optional[int]; additive)stage_scheduling_hints(hints)— producer helper; writes toTaskContext.next_block_scheduling_hintsbefore each yield.stage_memory_hint(memory)— single-axis convenience.BlockEntry.scheduling_hints: Optional[SchedulingHints]— sibling field tometadata, defaulting to None.BlockMetadataWithSchema.scheduling_hints— wire-envelope field that carries the hint from worker to driver alongside the per-block metadata.from_metadata/from_blockthread it through.TaskContext.next_block_scheduling_hints+consume_next_block_scheduling_hints()— staging slot consumed and cleared by_map_taskafter each yield, so a stale value can't silently mis-tag later blocks.RefBundle.scheduling_hintsaccessor — parallel list toblock_refs/metadata, returns the per-block forecasts.Plumbing:
_map_taskreads the staged hints, attaches them to theBlockMetadataWithSchemait pickles per yield.PhysicalOperatordriver-side bundle assembly lifts hints from BMWS intoBlockEntry.scheduling_hintsso consumers see them via the bundle's accessor.This PR ships the infra only — no operator currently stages or reads hints. The Download operator wires producer (file-size totals → memory forecast) and consumer (bundle hint sum → per-task
memoryresource) in a follow-on PR.