-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][1/3] Make streaming generator public API #38784
[Core][1/3] Make streaming generator public API #38784
Conversation
python/ray/_private/worker.py
Outdated
@@ -121,13 +121,17 @@ | |||
DAGNode = TypeVar("DAGNode") | |||
|
|||
|
|||
class RayWaitable(Generic[R]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be contraversial.
I found a bit weird to make ObjectRef and ObjectRefGenerator a subclass of RayWaitable because
- they don't share any functionality
- they don't share any interface.
So I found it a bit more natural to just define the Union types. Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using a union makes sense. but I think using an empty interface won't hurt either. whichever way should be fine.
2 related questions:
- Should
StreamingObjectRefGenerator
also be a generic type? - is there any difference between defining
RayWaitable
in this way versus justRayWaitable[R] = Union["ObjectRef[R]", StreamingObjectRefGenerator]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- so in this case, it is the type of returned object ref?
StreamingObjectRefGenerator[T] == __next__(self) -> ObjectRef[T]
?
I think this sgtm.
- So I tried defining
RayWaitable = Union["ObjectRef[R]", StreamingObjectRefGenerator]
, but it raised an exception, so I tried the current approach lol..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 is addressed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm actually I found out Cython doesn't support Generic... We can't add a type here unless we move this out of cython code (which I don't plan to)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we cannot just do type alias RayWaitable = Union[]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class RayWaitable
and Union["ObjectRef[R]", StreamingObjectRefGenerator]
are fundamentally different? looks Union["ObjectRef[R]", StreamingObjectRefGenerator]
are more preferrable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we can't if we'd like to support RayWaitable[R] -> this syntax. I originally tried RayWaitable = Union[...], but it failed when I declared RayWaitable[R]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed it doesn't work. We cannot make RayWaittable a Generic type without declaring a class that inherits from Generic
python/ray/_private/worker.py
Outdated
if isinstance(object_refs, StreamingObjectRefGenerator): | ||
# TODO(sang): We should raise an exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raising an exception requires lots of test changes. I will fix it in the third PR.
python/ray/_private/worker.py
Outdated
@@ -121,13 +121,17 @@ | |||
DAGNode = TypeVar("DAGNode") | |||
|
|||
|
|||
class RayWaitable(Generic[R]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using a union makes sense. but I think using an empty interface won't hurt either. whichever way should be fine.
2 related questions:
- Should
StreamingObjectRefGenerator
also be a generic type? - is there any difference between defining
RayWaitable
in this way versus justRayWaitable[R] = Union["ObjectRef[R]", StreamingObjectRefGenerator]
?
@@ -262,23 +262,31 @@ class ObjectRefGenerator: | |||
|
|||
|
|||
class StreamingObjectRefGenerator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not blocking this PR. Should we move this class to a different file? this file is pretty long. also, is there a perf improvement to define it in Cython? It seems to me that most code in this class is pure Python code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can definitely move out of it. I will handle it in the second PR together
Btw, until @pcmoritz gives a green light, I will mark the feature as alpha. I will still proceed merging PRs needed for the proposal (and try to get it in by 2.7), so please review it! https://docs.google.com/document/d/1LzKPL7wxz5tUJJLMMZ7BlfdDlYclAzMdzolTezU06ck/edit#heading=h.ehfmi7oa003b |
else "The keyword 'num_returns' only accepts None, a non-negative integer, or " | ||
'"dynamic" (for generators)', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated to mention streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to hide "streaming". It will work without streaming if it is a generator. I will modify the comment when I hard deprecate the dynamic generator.
python/ray/_private/worker.py
Outdated
@@ -121,13 +121,17 @@ | |||
DAGNode = TypeVar("DAGNode") | |||
|
|||
|
|||
class RayWaitable(Generic[R]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we cannot just do type alias RayWaitable = Union[]
python/ray/_private/worker.py
Outdated
__arg0: "Union[T0, ObjectRef[T0]]", | ||
__arg1: "Union[T1, ObjectRef[T1]]", | ||
) -> "ObjectRef[R]": | ||
__arg0: "Union[T0, RayWaitable[T0]]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for the inputs, it should still be T0 or ObjectRef[T0]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally did that. It doesn't work with mypi because remote methods return RayWaitable, and if we pass that into other method, it cannot be automatically cast to ObjectRef without runtime check (mypi check is done statically)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep updated. I overloaded the remote method so that it can return both generator and object ref. After that it is fixed
python/ray/_private/worker.py
Outdated
@@ -2837,7 +2863,9 @@ def kill(actor: "ray.actor.ActorHandle", *, no_restart: bool = True): | |||
|
|||
@PublicAPI | |||
@client_mode_hook | |||
def cancel(object_ref: "ray.ObjectRef", *, force: bool = False, recursive: bool = True): | |||
def cancel( | |||
ray_waitable: RayWaitable[R], *, force: bool = False, recursive: bool = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's both waitable and cancelable lol
@@ -264,23 +264,31 @@ class ObjectRefGenerator: | |||
|
|||
|
|||
class StreamingObjectRefGenerator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotate with @publicapi
Also do we want to start with alpha or beta or stable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the scope of the second PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will scope it to beta
We decided to make the API public in Ray 2.8 instead (we will still update the documentation for 2.7 as an alpha feature) |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
This is a PR to write a streaming generator documentation. Unlike existing generator doc, streaming generator doc is under "advanced" usage, and I will add proper links from each task/actor/pattern pages to this doc. The doc is written with the assumption is is an "alpha" feature. It is important to merge this doc PR before proceeding with #38784 as it will need to refer docs here and there. Some of the contents (such as "API is subject to change") will be removed after we merge these PRs.
Let's get back to merging this PR. Re; typing.
In this PR, I just made sure it doesn't raise an error when using mypy |
python/ray/_private/worker.py
Outdated
__arg0: "Union[T0, ObjectRef[T0]]", | ||
__arg1: "Union[T1, ObjectRef[T1]]", | ||
) -> "ObjectRef[R]": | ||
__arg0: "Union[T0, ObjectRefType[T0]]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug I found. When I passed object ref, mypy couldn't find the correct type, but it found ray._raylet.ObjectRef
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @pcmoritz
python/ray/types.py
Outdated
|
||
|
||
@PublicAPI(stability="beta") | ||
class StreamingObjectRefGenerator(Generic[T]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Currently, Cython cannot define generic inherited classes. WE need to upgrade it to support it
default_value=1, | ||
else "Default None. The keyword 'num_returns' only accepts None, " | ||
"a non-negative integer, or " | ||
'"dynamic" (for generators). "dynamic" is deprecated from Ray 2.7.', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'"dynamic" (for generators). "dynamic" is deprecated from Ray 2.7.', | |
'"streaming" (for generators). "dynamic" is deprecated from Ray 2.7.', |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the default value is not 1 but None, what does None mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me update the comment. None basically means default.
Default behavior is 1 for regular task, and streaming for a generator task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the comment. Please take a look and lmk what you think
python/ray/_private/worker.py
Outdated
@@ -59,6 +59,8 @@ | |||
StreamingObjectRefGenerator, | |||
raise_sys_exit_with_custom_error_message, | |||
) | |||
from ray.types import ObjectRef as ObjectRefType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this alias?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we also import ObjectRef. We currently have 2 object ref, one real one (ObjectRef in Cython), and the other just for typing (types.ObjectRef). I think we should fix this, but it is not in a scope of this PR.
When I don't import this type, I found mypy looks for the ObjectRef in Cython, and it fails a pretty basic test when I annotate the object ref type to test_typing_good.
python/ray/_private/worker.py
Outdated
@@ -2682,50 +2685,65 @@ def put( | |||
@PublicAPI | |||
@client_mode_hook | |||
def wait( | |||
object_refs: List["ray.ObjectRef"], | |||
ray_waitables: Union["ObjectRefType[R]", "StreamingObjectRefGeneratorType[R]"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to make StreaingObjectRefGenerator generic since it can return multiple objects with different types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you can do StreaingObjectRefGenerator[Union[...]]
in that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is important decision though. Normally generator has a type annotation Generator[yield type, send type, return type], so I think it makes sense to have it in generator too. But I created a sync meeting with @pcmoritz tomorrow to discuss a bit more details.
def send(self, value): | ||
raise NotImplementedError("`gen.send` is not supported.") | ||
|
||
def throw(self, value): | ||
raise NotImplementedError("`gen.throw` is not supported.") | ||
|
||
def close(self): | ||
raise NotImplementedError("`gen.close` is not supported.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are standard generator APIs that are not supported in Ray
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
""" | ||
TODO(sang): Enable it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to disable it for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not working now
@@ -4101,6 +4101,7 @@ cdef class CoreWorker: | |||
worker.current_session_and_job) | |||
else: | |||
return ray.actor.ActorHandle(language, actor_id, | |||
{}, # method is_generator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to make these kwargs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, are you talking about
def __init__(self,
method is_generator=None
):
something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, use kwargs instead of a long list of optional positionals with comments
@@ -4101,6 +4101,7 @@ cdef class CoreWorker: | |||
worker.current_session_and_job) | |||
else: | |||
return ray.actor.ActorHandle(language, actor_id, | |||
{}, # method is_generator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to make these kwargs
This PR is doing - [x] Introduce throw/send - [x] Bring up RayWaitable type - [x] Modify types for cancel, remote, ray.get, and ray.wait - [x] Update doctoring for ray.wait & ray.cancel - [x] Implement completed, is_next_ready, is_finished - [x] Allow to not specify num_returns=“streaming” There will be total 3 PRs; All sort of implementation work <---- current Rename StreamingObjectRefGenerator and annotate it as a public API (and move it to __init__.py) Typing support
This is the second PR of 3 steps. All sort of implementation work [done] Rename StreamingObjectRefGenerator and annotate it as a public API (and move it to init.py) <---- this PR Typing support [future] This PR renames StreamingObjectRefGenerator - > ObjectRefGenerator. This also means the existing dynamic generator's (num_returns="dynamic") return type will become ObjectRefGenerator -> DynamicObjectRefGenerator, which is a breaking change, but we will anyway deprecate this API. I am open for a different name if we'd like to keep the perfect backward compatibility.
Why are these changes needed?
This PR is doing
- [x] Introduce throw/send
- [x] Bring up RayWaitable type
- [x] Modify types for cancel, remote, ray.get, and ray.wait
- [x] Update doctoring for ray.wait & ray.cancel
- [x] Implement completed, is_next_ready, is_finished
- [x] Allow to not specify num_returns=“streaming”
There will be total 3 PRs;
__init__.py
)Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.