-
Notifications
You must be signed in to change notification settings - Fork 7.3k
Description
Description
TaskPoolMapOperator and ActorPoolMapOperator report logical resource usage based on statically-defined ray_remote_args
ray/python/ray/data/_internal/execution/operators/task_pool_map_operator.py
Lines 140 to 146 in 1f45462
| def current_logical_usage(self) -> ExecutionResources: | |
| num_active_workers = self.num_active_tasks() | |
| return ExecutionResources( | |
| cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, | |
| gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, | |
| memory=self._ray_remote_args.get("memory", 0) * num_active_workers, | |
| ) |
However, if ray_remote_args_fn is defined, then the operator can launch tasks with more logical resources than the statically defined ray_remote_args. This leads to resource manager undercounting logical resource usage.
ray/python/ray/data/_internal/execution/operators/map_operator.py
Lines 504 to 516 in 1f45462
| def _get_dynamic_ray_remote_args( | |
| self, input_bundle: Optional[RefBundle] = None | |
| ) -> Dict[str, Any]: | |
| ray_remote_args = copy.deepcopy(self._ray_remote_args) | |
| # max_calls isn't supported in `.options()`, so we remove it when generating dynamic ray_remote_args | |
| ray_remote_args.pop("max_calls", None) | |
| # Override parameters from user provided remote args function. | |
| if self._ray_remote_args_fn: | |
| new_remote_args = self._ray_remote_args_fn() | |
| for k, v in new_remote_args.items(): | |
| ray_remote_args[k] = v |
Proposal solution
We should record the actual resources used whenever we launch a task, so that when we call current_logical_usage, we can get the actual logical resource usage.
ray/python/ray/data/_internal/execution/operators/task_pool_map_operator.py
Lines 110 to 111 in 1f45462
| dynamic_ray_remote_args = self._get_dynamic_ray_remote_args(input_bundle=bundle) | |
| dynamic_ray_remote_args["name"] = self.name |
def _try_schedule_task(...):
...
dynamic_ray_remote_args = self._get_dynamic_ray_remote_args(input_bundle=bundle)
self._curr_resources["cpu"] += dynamic_ray_remote_args["cpu"]
...
# callback function to remove resources after task done
def _on_task_done():
self._curr_resources["num_cpus"] -= task_resources["num_cpus"]
...
self._submit_data_task(gen, bundle, task_done_callback=_on_task_done)
def current_logical_usage(self) -> ExecutionResources:
return ExecutionResources(
cpu=self._curr_resources.get("num_cpus", 0)
gpu=self._curr_resources.get("num_gpus", 0)
memory=self._curr_resources.get("memory", 0)
)Use case
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status