fix(ray): restore ray client functionality [ENG-339]#119
fix(ray): restore ray client functionality [ENG-339]#119eywalker merged 2 commits intonauticalab:devfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Restores Ray client-mode compatibility for RayExecutor.async_execute_callable() by reintroducing asyncio.wrap_future() when awaiting Ray *.future() results, preventing TypeError when using ray:// addresses with async orchestration.
Changes:
- Reinstates
await asyncio.wrap_future(ref.future())inRayExecutor.async_execute_callable(). - Updates the regression source-inspection test to assert
wrap_futureusage and explains the Ray client-mode requirement. - Adds a behavioral regression test that simulates
ClientObjectRef.future()returning aconcurrent.futures.Futureresolved from a background thread.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/orcapod/core/executors/ray.py |
Restores asyncio.wrap_future(ref.future()) to correctly await Ray client futures. |
tests/test_core/test_regression_fixes.py |
Updates/extends regression coverage for Ray async execution in client mode. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assert "wrap_future" in source, ( | ||
| "async_execute_callable must use asyncio.wrap_future() for " | ||
| "Ray client mode compatibility — ClientObjectRef.future() " | ||
| "returns concurrent.futures.Future, not asyncio.Future" | ||
| ) |
There was a problem hiding this comment.
The source-inspection assertion is too weak: it only checks that the substring "wrap_future" appears anywhere in the function source, which would still pass if the call to asyncio.wrap_future(...) were removed but the docstring/comment remained. Consider asserting a more specific pattern (e.g., "await asyncio.wrap_future(" and/or "asyncio.wrap_future(ref.future()"), and optionally asserting that there is no bare await ref.future() call.
| assert "wrap_future" in source, ( | |
| "async_execute_callable must use asyncio.wrap_future() for " | |
| "Ray client mode compatibility — ClientObjectRef.future() " | |
| "returns concurrent.futures.Future, not asyncio.Future" | |
| ) | |
| # Ensure we actually await asyncio.wrap_future(...) | |
| assert "await asyncio.wrap_future(" in source, ( | |
| "async_execute_callable must await asyncio.wrap_future(...) for " | |
| "Ray client mode compatibility — ClientObjectRef.future() " | |
| "returns concurrent.futures.Future, not asyncio.Future" | |
| ) | |
| # Ensure the wrapped object is ref.future(), not something else | |
| assert "asyncio.wrap_future(ref.future()" in source, ( | |
| "async_execute_callable must call asyncio.wrap_future(ref.future()) " | |
| "to correctly bridge concurrent.futures.Future returned by " | |
| "ClientObjectRef.future() into the asyncio world" | |
| ) | |
| # Guard against the incorrect pattern of awaiting ref.future() directly | |
| assert "await ref.future(" not in source, ( | |
| "async_execute_callable must not await ref.future() directly; " | |
| "it must use await asyncio.wrap_future(ref.future()) instead" | |
| ) |
There was a problem hiding this comment.
Adopted, thanks — tightened the assertions to check for the specific await asyncio.wrap_future( pattern and guard against bare await ref.future().
| # In Ray client mode (ray://) ClientObjectRef.future() returns a | ||
| # concurrent.futures.Future which cannot be directly awaited. | ||
| # asyncio.wrap_future() handles both Future types correctly. | ||
| raw, stdout_log, stderr_log, python_logs = await asyncio.wrap_future( | ||
| ref.future() | ||
| ) |
There was a problem hiding this comment.
The comment says asyncio.wrap_future() "handles both Future types"; in stdlib asyncio, wrap_future() only accepts concurrent.futures.Future (passing an asyncio.Future raises TypeError). Either tighten the comment to state that Ray's *.future() returns a concurrent.futures.Future (in the modes you support), or add a small type check so async_execute_callable can correctly await either an asyncio.Future or a concurrent.futures.Future.
There was a problem hiding this comment.
The comment is actually correct — asyncio.wrap_future() does handle both types. Per the Python docs: "If the argument is already an asyncio.Future, it is returned unchanged." So no type check is needed — wrap_future is safe for both local Ray (asyncio.Future) and client mode (concurrent.futures.Future).
eywalker
left a comment
There was a problem hiding this comment.
Thanks for fixing the regression I've introduced!
Fix: Restore asyncio.wrap_future() in RayExecutor.async_execute_callable()
Problem
RayExecutor.async_execute_callable() fails with TypeError: object Future can't be used in 'await' expression when connected to a Ray cluster via the
client protocol (ray:// addresses).
In Ray client mode, ClientObjectRef.future() returns a concurrent.futures.Future, not an asyncio.Future. Directly awaiting it raises a TypeError,
causing all packets to fail when using the AsyncPipelineOrchestrator with a remote Ray cluster.
Root Cause
Commit c1fe4bd (March 26, "fix(review): address Copilot PR review comments") removed asyncio.wrap_future() from async_execute_callable(), replacing:
raw, stdout_log, stderr_log, python_logs = await asyncio.wrap_future(ref.future())
with:
raw, stdout_log, stderr_log, python_logs = await ref.future()
The commit message stated "Ray's ObjectRef.future() already returns an asyncio.Future, making wrap_future unnecessary." This is true for local Ray
(ObjectRef), but not for Ray client mode (ClientObjectRef), which returns concurrent.futures.Future. The existing test was also modified to assert
the broken behavior.
Fix
correctly handles both Future types.
mode) and verifies the executor correctly awaits it.