New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dask-on-Ray] Add support for Dask annotations. #22057
[Dask-on-Ray] Add support for Dask annotations. #22057
Conversation
c0748dc
to
c8df319
Compare
c8df319
to
d40ca6e
Compare
# NOTE: We disable graph optimization since it can break annotations, | ||
# see this issue: https://github.com/dask/dask/issues/7036. | ||
result = sum_.compute( | ||
resources={"another_custom_resource": 0.01}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is resources something from Dask? It's a bit confusing to understand how this interacts with ray remote args, such as num_cpus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dask Distributed has a similar resources concept, which we try to accommodate by mapping e.g. resources={"CPU": 1, "GPU: 1}
to num_cpus=1, num_gpus=1
.
It should be noted that the dask.compute()
API allows for arbitrary, scheduler-specific kwargs, so we can ask the user to specify whatever Ray-specific things they want here and it will be transparently passed on to the Dask-on-Ray scheduler. The same goes for dask.annotate()
, it allows arbitrary annotations which the scheduler can interpret however it wants.
python/ray/util/dask/scheduler.py
Outdated
("CPU", "num_cpus"), | ||
("GPU", "num_gpus"), | ||
("memory", "memory"), | ||
("object_store_memory", "object_store_memory"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dask knows about object store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dask does not know about the object store; this map can be thought of as "resources to pluck out of resources
and give as top-level Ray task options under a new name", i.e. these are resources that we do not allow to be in the ray.remote(resources={...})
argument and that have to be given as top-level args. Dask Distributed users are going to be used to giving their CPU and GPU resource requests as part of the resources={...}
dict, either to .compute()
or as an .annotate()
annotation, so this mapping makes this transition a bit easier.
The other option is to let Ray throw an error when these resources are given in the ray.remote(resources={...})
argument and make the user give num_cpus
, num_gpus
, etc., as top-level .compute()
and .annotate()
arguments. I'd like to give the user the option to keep all resource requests in the resources={...}
dict since I think that's a nice, clean option, but happy to keep this more in line with our ray.remote()
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other option is to let Ray throw an error when these resources are given in the ray.remote(resources={...}) argument.
Upon further thought I like this option. Can we do this, and also update the docs to make it clear resources
is for Dask compatibility only? Users can just use ray_remote_args if they don't need compat.
Also, object_store_memory
isn't actually a requestable resource...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually agree with your initial point. I'm thinking that it'd be better to either completely accommodate the top-level resources={...}
API (as currently exists in the PR), or only accept ray_remote_args
. Only supporting the resources={...}
API for custom (non-system) resources seems like bad split-API UX.
# (1) Current PR
with dask.annotate(resources={"CPU": 1, "foo": 1}):
col = ...
# (2) Custom resources + remote args
with dask.annotate(resources={"foo": 1}, ray_remote_args={"num_cpus": 1}):
col = ...
# (3) Remote args only
with dask.annotate(ray_remote_args={"num_cpus": 1, "resources": {"foo: 1}}}):
col = ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, then maybe we should go for (3), which optimizes for clarity over perfect compatibility. In that case, we can raise an warning/error if "resources" is found in the kwargs, telling the user to use ray_remote_args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed! I'll make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change is made. Compared to allowing a top-level resources
along with num_cpus
, num_gpus
, etc., I feel as if this is slightly worse UX, see example: e2a8a17#diff-cc2090d8130a6fee7ad03b26c646dc1640107533f4e05f6cca02eb7d8933553e
If this UX hit seems tenable to you, then I'm cool with it.
6e8a8fc
to
e2a8a17
Compare
This PR adds support for Dask annotations, allowing users to specify Ray-level resource requests (or any other Ray task options) for both individual Dask operations, using the
dask.annotate
API, or for the Dask workload as a whole by passing in.compute(resources={...}, ray_remote_args={...})
at compute time.See #21536 for more details on this feature.
Related issue number
Closes #21536
Checks
scripts/format.sh
to lint the changes in this PR.