Skip to content

[core][rdt] ray.get launches concurrent one-sided transfers for multiple ObjectRefs#61773

Merged
edoakes merged 27 commits into
ray-project:masterfrom
stephanie-wang:async-transfers
May 13, 2026
Merged

[core][rdt] ray.get launches concurrent one-sided transfers for multiple ObjectRefs#61773
edoakes merged 27 commits into
ray-project:masterfrom
stephanie-wang:async-transfers

Conversation

@stephanie-wang
Copy link
Copy Markdown
Contributor

@stephanie-wang stephanie-wang commented Mar 16, 2026

Description

Instead of fetching objects one at a time during ray.get, we launch a fetch request per ObjectRef, then wait for all of them to complete. This should allow overlapping network transfers for requests of multiple ObjectRefs at a time. Note that this PR only supports this for ray.get, not for transfer of task arguments.

The PR refactors the TensorTransportManager to add new methods, fetch_multiple_transfers and wait_fetch_complete. The default implementation for these calls the synchronous recv_multiple_transfers during fetch and simply returns the tensors during wait_fetch_complete. Backends that support asynchronous fetching can override the new methods to allow concurrent transfers (see NixlTensorTransport for an example).

This PR also adds timeout support for ray.get on RDT objects and unit tests for RDTManager.

Related issues

Closes #61453.

Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
@stephanie-wang stephanie-wang requested a review from a team as a code owner March 16, 2026 21:00
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces concurrent fetching for RDT objects in ray.get by refactoring the tensor transport mechanism to support an asynchronous fetch/wait pattern. The changes are well-structured, introducing a new FetchRequest to manage state for asynchronous operations and providing default synchronous implementations for backward compatibility. The NixlTensorTransport is updated to leverage this new asynchronous pattern, and the RDTManager and Worker are modified to pipeline multiple fetch requests. The implementation appears correct and aligns with the goal of improving performance for ray.get with multiple RDT objects. I have one suggestion regarding a potentially unused class that could cause confusion.

Comment on lines +37 to +48
@dataclass
class TransferMetadata:
"""Base class for in-flight tensor transfer state.

This class holds the minimal state needed to track an async transfer.
Backend-specific implementations should extend this class with additional fields.

Args:
tensors: The tensors being transferred.
"""

tensors: List[Any]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The TransferMetadata dataclass appears to be unused in this pull request. The NixlFetchRequest, which holds the in-flight transfer state, inherits from FetchRequest, not this class. Additionally, there is another TransferMetadata NamedTuple defined in rdt_manager.py, which could lead to confusion. If this class is not intended for use, consider removing it to improve clarity.

Comment thread python/ray/experimental/rdt/rdt_manager.py Outdated
Comment thread python/ray/experimental/rdt/rdt_manager.py
Comment thread python/ray/experimental/rdt/tensor_transport_manager.py Outdated
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Comment thread python/ray/_private/worker.py
Comment thread python/ray/experimental/rdt/rdt_manager.py Outdated
@ray-gardener ray-gardener Bot added the core Issues that should be addressed in Ray Core label Mar 17, 2026
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Comment thread python/ray/experimental/rdt/rdt_manager.py
Comment thread python/ray/experimental/rdt/rdt_manager.py Outdated
Copy link
Copy Markdown
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little weird to have the ray.get path and the implicit get path diverge so much. But I guess eventually we'll unify them a bit if we ever get around to having rdt more integrated into the dependency waiter



class _PipelineCheckingTransport(TensorTransportManager):
"""Fake one-sided transport that records the order of fetch/wait calls.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice, we've been needing these fake transports

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can with the custom transport API :)

manager = _build_manager([obj_id], backend=_TWO_SIDED_BACKEND_NAME)

with pytest.raises(ValueError, match="use_object_store=True"):
manager.get_rdt_objects([obj_id], use_object_store=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this test already exists / should exist in the gloo tests where ray.get-ting should fail

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually I could not find such an existing test. I figured it's nice to have a unit test for the behavior here since it can run much faster. Actually let me update the match string though.

Comment thread python/ray/tests/rdt/test_rdt_manager.py Outdated
Comment thread python/ray/experimental/rdt/rdt_manager.py Outdated
Comment thread python/ray/experimental/rdt/rdt_manager.py Outdated
self._wait_fetch(object_id, fetch_request)
except Exception as e:
if trigger_exception is None:
trigger_exception = e
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wait for all the fetches before raising, i think this is different than the normal ray.get semantic where if any of the obj id's error it's immediately raised without waiting for other obj's

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah the problem is that _wait_fetch also does some cleanup :( Right now it's only relevant for NIXL, though, since it's cleaning up memory registrations.

Another option is to update it so that FetchRequest.del does the cleanup.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

)
else:
result[object_id] = rdt_store.wait_and_get_object(
object_id, timeout=ray_constants.RDT_FETCH_FAIL_TIMEOUT_SECONDS
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the timeouts here could probably be 0 right, otherwise something went wrong in the fetch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this codepath for both ray.get (where the timeout here should be 0) and the implicit get (where the timeout here is real, and we should actually wait). This was an issue before this PR too, I added a TODO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

trigger_exception = e

if trigger_exception is not None:
raise trigger_exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other objects won't get popped out if one of the obj's raises an exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, thanks.

Comment thread python/ray/experimental/rdt/rdt_manager.py
# the remaining fetches.
for object_id, fetch_request in fetch_requests.items():
try:
self._wait_fetch(object_id, fetch_request)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait fetch won't abide by the timeout?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it didn't before either. Let me see if I can update this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

It feels a little weird to have the ray.get path and the implicit get path diverge so much. But I guess eventually we'll unify them a bit if we ever get around to having rdt more integrated into the dependency waiter

Yeah I realized this too, but the goal here was just to get this in now and we can later update the implicit get path.

Co-authored-by: Dhyey Shah <dhyey2019@gmail.com>
Signed-off-by: Stephanie Wang <smwang@cs.washington.edu>
Comment thread python/ray/experimental/rdt/rdt_manager.py Outdated
@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 18, 2026
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 24, 2026
Comment thread python/ray/experimental/rdt/rdt_store.py
Comment thread python/ray/tests/rdt/test_rdt_manager.py
Comment thread python/ray/tests/rdt/test_rdt_manager.py
Comment thread python/ray/experimental/rdt/rdt_manager.py
@github-actions github-actions Bot added unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. and removed stale The issue is stale. It will be closed within 7 days unless there are further conversation labels Apr 28, 2026
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
@stephanie-wang stephanie-wang enabled auto-merge (squash) April 28, 2026 22:54
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label Apr 28, 2026
Comment thread python/ray/experimental/rdt/rdt_manager.py
Comment thread python/ray/experimental/rdt/rdt_manager.py
@aslonnie aslonnie removed request for a team and aslonnie April 29, 2026 02:51
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
@github-actions github-actions Bot disabled auto-merge April 30, 2026 03:34
Comment thread python/ray/experimental/rdt/nixl_tensor_transport.py
Comment thread python/ray/experimental/rdt/rdt_manager.py
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit e5e651e. Configure here.

owner_address="",
call_site="",
)
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetched RDT objects not stored for repeated ray.get

Medium Severity

Objects fetched via _trigger_fetch and _wait_fetch in fetch_and_get_rdt_objects are returned but never added to the rdt_store. The old _fetch_object code called rdt_store.add_object(obj_id, tensors) after a successful fetch, allowing subsequent ray.get calls on the same ObjectRef to find the object in phase 1's store check. Without this caching, every ray.get call on the same RDT ObjectRef triggers a new network transfer, which may fail if the source has already garbage-collected the object.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit e5e651e. Configure here.

@edoakes edoakes merged commit f77ec0b into ray-project:master May 13, 2026
4 of 6 checks passed
am-kinetica pushed a commit to kineticadb/ray that referenced this pull request May 14, 2026
…ple ObjectRefs (ray-project#61773)

Instead of fetching objects one at a time during `ray.get`, we launch a
fetch request per ObjectRef, then wait for all of them to complete. This
should allow overlapping network transfers for requests of multiple
ObjectRefs at a time. Note that this PR only supports this for
`ray.get`, not for transfer of task arguments.

The PR refactors the TensorTransportManager to add new methods,
`fetch_multiple_transfers` and `wait_fetch_complete`. The default
implementation for these calls the synchronous `recv_multiple_transfers`
during fetch and simply returns the tensors during
`wait_fetch_complete`. Backends that support asynchronous fetching can
override the new methods to allow concurrent transfers (see
NixlTensorTransport for an example).

This PR also adds timeout support for ray.get on RDT objects and unit
tests for RDTManager.

Closes ray-project#61453.

---------

Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Co-authored-by: Dhyey Shah <dhyey2019@gmail.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[core][rdt] Allow multiple transfers before synchronizing

5 participants