Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen committed Feb 17, 2024
1 parent 6e69a89 commit df2e4be
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 49 deletions.
98 changes: 49 additions & 49 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from collections import defaultdict
import copy
import os
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Optional

import ray
Expand Down Expand Up @@ -48,13 +48,9 @@ def __init__(self, topology: "Topology", options: ExecutionOptions):
self._global_limits_last_update_time = 0
self._global_usage = ExecutionResources.zero()
self._op_usages: Dict[PhysicalOperator, ExecutionResources] = {}
self._obj_store_pending_task_outputs: Dict[PhysicalOperator, int] = defaultdict(
int
)
self._obj_store_pending_task_outputs: Dict[PhysicalOperator, int] = defaultdict(int)
self._obj_store_output_buffers: Dict[PhysicalOperator, int] = defaultdict(int)
self._obj_store_next_op_input_buffers: Dict[
PhysicalOperator, int
] = defaultdict(int)
self._obj_store_next_op_input_buffers: Dict[PhysicalOperator, int] = defaultdict(int)
self._debug = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1"

self._downstream_fraction: Dict[PhysicalOperator, float] = {}
Expand Down Expand Up @@ -260,43 +256,47 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
op for op in self._resource_manager._topology if isinstance(op, MapOperator)
]
# Per-op reserved resources.
self._op_reserved_tasks: Dict[PhysicalOperator, ExecutionResources] = {}
self._op_reserved_output_buffers: Dict[PhysicalOperator, int] = {}
self._op_reserved: Dict[PhysicalOperator, ExecutionResources] = {}
# Total shared resources.
self._total_shared = ExecutionResources.zero()

self._op_submit_task_budgets: Dict[PhysicalOperator, ExecutionResources] = {}
self._op_fetch_task_outputs_budgets: Dict[PhysicalOperator, int] = {}

# Resource limits for each operator.
self._op_limits: Dict[PhysicalOperator, ExecutionResources] = {}
self._cached_global_limits = ExecutionResources.zero()

def _on_global_limits_updated(self, global_limits: ExecutionResources):
if len(self._eligible_ops) == 0:
return

self._total_shared = global_limits.scale(1.0 - self._reservation_ratio)
self._total_shared = global_limits.scale(1)

default_reserved = global_limits.scale(
self._reservation_ratio / len(self._eligible_ops)
)
op_reserved_output_buffers = max(default_reserved.object_store_memory * 0.5, 1)
default_reserved.object_store_memory *= 0.5

for op in self._eligible_ops:
# Make sure the reserved resources are at least to allow
# one task.
self._op_reserved_output_buffers[op] = op_reserved_output_buffers
self._op_reserved_tasks[op] = default_reserved.max(
op.incremental_resource_usage()
inc = op.incremental_resource_usage()
inc.object_store_memory *= 2
self._op_reserved[op] = default_reserved.max(
inc
)
self._total_shared = self._total_shared.subtract(self._op_reserved[op])
self._total_shared = self._total_shared.max(ExecutionResources.zero())

def can_submit_new_task(self, op: PhysicalOperator) -> bool:
inc = op.incremental_resource_usage()
limit = self._op_submit_task_budgets.get(op, ExecutionResources.inf())
return inc.satisfies_limit(limit)
"""Return whether a new task can be submitted to the given operator."""
if op not in self._eligible_ops:
return True
limit = copy.deepcopy(self._op_limits[op])
limit.object_store_memory -= max(self._op_reserved[op].object_store_memory // 2 - self._resource_manager._obj_store_output_buffers[op] - self._resource_manager._obj_store_next_op_input_buffers[op], 0)
res = op.incremental_resource_usage().satisfies_limit(limit)
return res

def max_task_outputs_to_fetch(self, op: PhysicalOperator) -> int:
return self._op_fetch_task_outputs_budgets.get(op, float("inf"))
"""Return the maximum number of task outputs that can be fetched from the given operator."""
if op not in self._eligible_ops:
return float("inf")
return self._op_limits[op].object_store_memory + max(self._op_reserved[op].object_store_memory // 2 - self._resource_manager._obj_store_output_buffers[op] - self._resource_manager._obj_store_next_op_input_buffers[op], 0)

def update_usages(self):
if len(self._eligible_ops) == 0:
Expand All @@ -307,29 +307,18 @@ def update_usages(self):
self._on_global_limits_updated(global_limits)
self._cached_global_limits = global_limits

self._op_limits.clear()
# Remaining of shared resources.
remaining_shared = self._total_shared
for op in self._resource_manager._topology:
op_usage = self._resource_manager.get_op_usage(op)
if op in self._eligible_ops:
self._op_fetch_task_outputs_budgets[op] = max(
self._op_reserved_output_buffers[op]
- self._resource_manager._obj_store_pending_task_outputs[op],
0,
)

task_usage = copy.deepcopy(op_usage)
task_usage.object_store_memory -= (
self._resource_manager._obj_store_pending_task_outputs[op]
)
self._op_submit_task_budgets[op] = (
self._op_reserved_tasks[op]
.subtract(task_usage)
.max(ExecutionResources.zero())
op_reserved = self._op_reserved[op]
# How much of the reserved resources are remaining.
op_reserved_remaining = op_reserved.subtract(op_usage).max(
ExecutionResources.zero()
)

op_reserved = copy.deepcopy(self._op_reserved_tasks[op])
op_reserved.object_store_memory += self._op_reserved_output_buffers[op]
self._op_limits[op] = op_reserved_remaining
# How much of the reserved resources are exceeded.
# If exceeded, we need to subtract from the remaining shared resources.
op_reserved_exceeded = op_usage.subtract(op_reserved).max(
Expand All @@ -345,15 +334,26 @@ def update_usages(self):
1.0 / len(self._eligible_ops)
)
for op in self._eligible_ops:
self._op_submit_task_budgets[op] = self._op_submit_task_budgets[op].add(
shared_divided
)
self._op_fetch_task_outputs_budgets[
op
] += shared_divided.object_store_memory
self._op_limits[op] = self._op_limits[op].add(shared_divided)
# We don't limit GPU resources, as not all operators
# use GPU resources.
self._op_submit_task_budgets[op].gpu = float("inf")
self._op_limits[op].gpu = float("inf")
continue
print(
op.name,
"limit:",
self._op_limits[op],
"reserved:",
self._op_reserved[op],
"usage:",
self._resource_manager.get_op_usage_str(op),
# self._resource_manager.get_op_usage(op),
# "pending task outputs:",
# op.metrics.obj_store_mem_pending_task_outputs,
)

def get_op_limits(self, op: PhysicalOperator) -> ExecutionResources:
assert False
if op in self._op_limits:
return self._op_limits[op]
else:
return ExecutionResources.inf()
1 change: 1 addition & 0 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
if DEBUG_TRACE_SCHEDULING:
logger.get_logger().info("Scheduling loop step...")

self._resource_manager.update_usages()
# Note: calling process_completed_tasks() is expensive since it incurs
# ray.wait() overhead, so make sure to allow multiple dispatch per call for
# greater parallelism.
Expand Down

0 comments on commit df2e4be

Please sign in to comment.