Skip to content

[Data][1/N] iter_torch_batches: proper h2d pipelining#64431

Open
kyuds wants to merge 8 commits into
masterfrom
kyuds/h2d-pipeline-default
Open

[Data][1/N] iter_torch_batches: proper h2d pipelining#64431
kyuds wants to merge 8 commits into
masterfrom
kyuds/h2d-pipeline-default

Conversation

@kyuds

@kyuds kyuds commented Jun 30, 2026

Copy link
Copy Markdown
Member

Description

Properly pipeline h2d transfers with compute.

Related issues

N/A

Additional information

N/A

kyuds added 3 commits June 29, 2026 16:58
Signed-off-by: Daniel Shin <kyuds@anyscale.com>
Signed-off-by: Daniel Shin <kyuds@anyscale.com>
Signed-off-by: Daniel Shin <kyuds@anyscale.com>
@kyuds kyuds requested a review from a team as a code owner June 30, 2026 00:41
@kyuds kyuds added the go add ONLY when ready to merge, run all tests label Jun 30, 2026
@kyuds kyuds marked this pull request as draft June 30, 2026 00:42

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces PipelinedFinalizeFn to overlap host-to-device transfers with downstream GPU compute using a dedicated copy stream and CUDA events, replacing the previous synchronous default_finalize_fn in iter_torch_batches. A critical issue was identified in the recursive _record_stream helper, where passing a string literal "torch.Tensor" to isinstance will cause a runtime TypeError instead of checking the type correctly.

Comment thread python/ray/data/_internal/utils/pipelined_finalize.py Outdated

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit fc318d4. Configure here.

Comment thread python/ray/data/_internal/utils/pipelined_finalize.py
kyuds and others added 2 commits June 29, 2026 17:43
@kyuds kyuds requested a review from justinvyu June 30, 2026 16:44
@kyuds kyuds marked this pull request as ready for review July 1, 2026 18:45
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Jul 1, 2026
Comment thread python/ray/data/_internal/utils/pipelined_finalize.py
Comment on lines +81 to +82
def _record_stream(batch: Any, stream: "torch.cuda.Stream") -> None:
"""Recursively call ``record_stream(stream)`` on every tensor in ``batch``.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we use pytree for this? I'd prefer to not have the "Any" input type; we should have an explicit input type contract.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused on where we would actually use pytree though. In the traversal?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also improved typing as batch is a TensorBatchReturnType, which is

TensorBatchReturnType = Union[
    "torch.Tensor",
    Tuple["torch.Tensor", ...],
    Dict[str, "torch.Tensor"],
]

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ps: I don't think pytree will be placed in the public library component anytime soon (discussion in torch repo shows that maintainers don't want it because pytree is not optimized and inefficient). There is another optree repo that we can use, but this is an external dependency for something that is not really that necessary.

Comment thread python/ray/data/_internal/utils/pipelined_finalize.py
Comment thread python/ray/data/_internal/utils/pipelined_finalize.py
@kyuds kyuds force-pushed the kyuds/h2d-pipeline-default branch from fde3d77 to 8e1497e Compare July 2, 2026 22:55
kyuds and others added 2 commits July 2, 2026 15:57
Signed-off-by: Daniel Shin <kyuds@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants