[core] add ray.util.concurrent.futures.RayExecutor#44922
[core] add ray.util.concurrent.futures.RayExecutor#44922GuillaumeDesforges wants to merge 2 commits intoray-project:masterfrom
Conversation
aslonnie
left a comment
There was a problem hiding this comment.
asking core team to review first.
|
This would be phenomenally useful! Is this likely to get reviewed soon? Anyway I can do to help? |
|
@GuillaumeDesforges I made a few fixes in #46249. Feel free to pull them into this PR if you like and then I will close the other one. |
|
The changes you propose make sense to me, I'll pull them into this PR asap and try to fix tests 👍 |
c278a51 to
7683980
Compare
|
Sorry it took so much time. I've rebased on #46249 to include your changes @judahrand 🙏 I'll try to fix failed tests if I can. |
|
@GuillaumeDesforges any progress on this? |
|
@GuillaumeDesforges any update here? |
|
I have been short on time, but I do plan to make these tests pass. 👍 EDIT 2024-10-04: still on my TODO 😄 |
RayExecutor is a drop-in replacement for ProcessPoolExecutor and ThreadPoolExecutor from concurrent.futures but distributes and executes the specified tasks over a pool of dedicated actors belonging to a Ray cluster instead of multiple processes or threads, respectively. Co-authored-by: Mohamed Nidabdella <mohamed.nidabdella@tweag.io>, Johann Eicher <johann.eicher@tweag.io>, Judah Rand <17158624+judahrand@users.noreply.github.com> Signed-off-by: Guillaume Desforges <guillaume.desforges.pro@gmail.com> Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
7683980 to
609359b
Compare
Signed-off-by: Guillaume Desforges <guillaume.desforges.pro@gmail.com>
609359b to
4bde5a6
Compare
|
I have looked into the doc error. I have no clue how to solve them. For the Python tests, we seem to have relied on an API that is not available in Python 3.9. |
|
Hey folks! Any chance this PR is likely to get merged at some point? Would be very useful, and probably better than my (extremely hacky) prototype: class RayExecutor(concurrent.futures.ThreadPoolExecutor):
def __init__(self, threadpool_kwargs: Dict[str, Any], ray_kwargs: Dict[str, Any]):
super(RayExecutor, self).__init__(**threadpool_kwargs)
self.ray_kwargs = ray_kwargs
def init(self):
# TODO - synchronize number of workers
ray.init(**self.ray_kwargs)
def submit(self, __fn, *args, **kwargs):
return ray.remote(__fn).remote(*args, **kwargs).future()
``` |
|
@elijahbenizzy happy to see interest in our work 😄 I won't have time anytime soon to try to satisfy the CI unfortunately. Anyone is free to pick this up. 🙂 EDIT: just saw someone was assigned to it. Nice! |
|
Not sure about the docs / linter, but for the actual core tests I think could use something like this to make the diff --git a/python/ray/util/concurrent/futures/ray_executor.py b/python/ray/util/concurrent/futures/ray_executor.py
index 237a9f17cd..62e0a4cfee 100644
--- a/python/ray/util/concurrent/futures/ray_executor.py
+++ b/python/ray/util/concurrent/futures/ray_executor.py
@@ -2,7 +2,21 @@ from abc import ABC, abstractmethod
import time
from functools import partial
from concurrent.futures import Executor, Future
-from concurrent.futures._base import _result_or_cancel # type: ignore
+
+try:
+ from concurrent.futures._base import _result_or_cancel # type: ignore
+except ImportError:
+ # Backport private Python 3.12 function to Python 3.9
+ # https://github.com/python/cpython/tree/main/Lib/concurrent/futures/_base.py#L306
+ def _result_or_cancel(fut, timeout=None):
+ try:
+ try:
+ return fut.result(timeout)
+ finally:
+ fut.cancel()
+ finally:
+ del fut
+
from typing import (
Any,
ParamSpec, |
|
@jjyao @GuillaumeDesforges Not sure if we want to get this merged or not, but #51933 makes CI all green for this PR. Changes in the description. |
|
So this pattern of accessing results: Will hang for In fact you quite nicely have the I can poke at this if we want to change it, but wanted to ask first since I imagine you thought through this given you wrote the test for it. Cool PR by the way. I have done serious damage with the |
|
I made a change to address the above comment - moving discussion to #51933 |
|
Closing this PR in favor of #51933 as it fixes the CI. |
RayExecutor is a drop-in replacement for ProcessPoolExecutor and ThreadPoolExecutor from concurrent.futures but distributes and executes the specified tasks over a pool of dedicated actors belonging to a Ray cluster instead of multiple processes or threads, respectively.
Why are these changes needed?
This change provides an easy way to use Ray for futures, allowing to scale an existing codebase that relies on them.
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.