Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add function to dynamically generate ray_remote_args for Map APIs #45143

Merged
merged 16 commits into from
May 9, 2024

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented May 4, 2024

Why are these changes needed?

Adds a new parameterray_remote_args_fn to Map APIs (map(), map_batches(), flat_map(), filter()), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee marked this pull request as ready for review May 6, 2024 19:32
# For each new actor, get new scheduling strategy by
# calling the generation fn.
remote_args["scheduling_strategy"] = self._scheduling_strategy_fn()
self._cls = ray.remote(**remote_args)(_MapWorker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, added it.

@@ -316,6 +318,8 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
concurrency: The number of Ray workers to use concurrently. For a fixed-sized
worker pool of size ``n``, specify ``concurrency=n``. For an autoscaling
worker pool from ``m`` to ``n`` workers, specify ``concurrency=(m, n)``.
scheduling_strategy_fn: A function that returns a ``SchedulingStrategy``
used to initialize the actor. Only valid if ``fn`` is a callable class.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actor -> worker, to be consistent with rest of docstring.

@@ -110,6 +110,7 @@ def create(
# TODO(ekl): slim down ComputeStrategy to only specify the compute
# config and not contain implementation code.
compute_strategy: Optional[ComputeStrategy] = None,
scheduling_strategy_fn: Optional[Callable[[], Any]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Any -> SchedulingStrategyT

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@@ -785,6 +796,7 @@ def flat_map(
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
scheduling_strategy_fn: Optional[Callable[[], "SchedulingStrategyT"]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @c21 offline, it would be more generalizable to add a ray_remote_args_fn, because we may want to dynamically generate other args as well.
The semantics can be that if both the ray_remote_args_fn and other kwargs ray_remote_args are specified, the kwargs take higher priority.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this usage is a bit too advanced, maybe let's mark it as experimental for now

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@@ -205,7 +218,7 @@ def __call__(self, args):
self.i %= len(self.locs)
return args

self._ray_remote_args_factory = RoundRobinAssign(locs)
self._ray_remote_args_factory_actor_locality = RoundRobinAssign(locs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to avoid similar naming with ray_remote_args_fn

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee changed the title [Data] Add scheduling strategy function param for Map APIs [Data] Add function to dynamically generate ray_remote_args for Map APIs May 7, 2024
new_remote_args = self._ray_remote_args_fn()

# Override args from user-defined remote args function.
new_and_overriden_remote_args = {}
Copy link
Contributor Author

@scottjlee scottjlee May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always override with the param from ray_remote_args_fn because Ray Data will use default parameters for scheduling_strategy and insert into ray_remote_args. We want to always override with scheduling strategy provided by the user defined function, and IMO we should do the same for other parameters if the user provides ray_remote_args_fn.

passed to each map worker. This function will be called each time prior
to initializing the worker. Args returned from this dict will always
override the args in ``ray_remote_args``. Note: this is an advanced,
experimental feature.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe add a note that the purpose of this argument is to allow generating dynamic arguments for each actor/task

@@ -204,6 +204,12 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
):
return False

# Only fuse if at most one op specifies a `_ray_remote_args_fn`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to not fuse if any has the fn. because the fn can return any args that may be incompatible with the other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's what i was thinking as well. But in the case of Read->Map with ray_remote_args_fn in the Map, then these two operators would not be fused, which would lead to performance drop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a concrete example of how bad the perf drop is? I guess at least for vLLM integration, it's not an issue, right? because the actors are GPU actors, they cannot be fused anyway.
Despite the perf drop in some cases, I still think, for correctness, we shouldn't fuse them, unless we have a way to tell whether the returned args are compatible with the previous op.

Copy link
Contributor Author

@scottjlee scottjlee May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least for vLLM integration, it's not an issue, right? because the actors are GPU actors, they cannot be fused anyway.

Good point. I don't have a concrete example for perf drop, I just remember previously from streaming executor discussion that it was described as must-have for fusing Read->Map cases. But since that doesn't apply for this vLLM fix, I think it should be OK.

@@ -604,12 +660,18 @@ def _kill_all_running_actors(self):

def _kill_running_actor(self, actor: ray.actor.ActorHandle):
"""Kill the provided actor and remove it from the pool."""
pg = self._actor_to_placement_groups.pop(actor, None)
if pg:
ray.util.remove_placement_group(pg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should remove the PG here. Because other bundles in the PG can be still used. also I think it's a bit hacky to add special logic for PGs here.
I have 2 alternative solutions in mind:

  • let the application handles PG removal after execution finishes. The drawback is scaling down will be delayed a bit.
  • instead of exposing ray_remote_args_fn, expose something lower level, such as "on_creating_actor", "on_actor_killed".
    I slightly prefer (1). But I think vLLM should eventually move away from PGs, adding a temporary internal hook is acceptable as well.
    cc @c21

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let the application handles PG removal after execution finishes. The drawback is scaling down will be delayed a bit.

Got it, +1 for option 1. The user-defined function can memorize the created PGs into a list, and remove all PGs at end of user program. WDYT?

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
ActorClass,
concurrency=3,
ray_remote_args_fn=_generate_ray_remote_args_with_scheduling_strategy,
).take_all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just to make this example more complete, let's also remove the PGs

Signed-off-by: Scott Lee <sjl@anyscale.com>
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@c21 c21 merged commit 866e03e into ray-project:master May 9, 2024
4 of 5 checks passed
liuxsh9 pushed a commit to liuxsh9/ray that referenced this pull request May 10, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
HenryZJY pushed a commit to HenryZJY/ray that referenced this pull request May 10, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request May 13, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
GabeChurch pushed a commit to GabeChurch/ray that referenced this pull request Jun 11, 2024
… APIs (ray-project#45143)

Adds a new parameter`ray_remote_args_fn` to Map APIs (`map()`, `map_batches()`, `flat_map()`, `filter()`), which allows the user to specify a function which returns a dict of Ray remote args be passed to an actor initialized from ActorPoolMapOperator. This function is called each time a worker is initialized, allowing the user to specify the parameters for every worker (e.g. setting the scheduling strategy at runtime).

Currently, Ray Data only allows passing static ray remote args, which has the limitation of sharing the placement group for all actors. This feature allows users to create different placement groups for each actor. For example, this will enable users to use Ray Data with vLLM with tensor parallel size > 1.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: gchurch <gabe1church@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants