Skip to content

Commit

Permalink
[ray] Support scheduling ray tasks in Ray oscar deploy backend (#3165)
Browse files Browse the repository at this point in the history
* support ray task mode on oscar

* fix mars on ray serialization

* fix ray dag serialization for local mode

* refine clean serializers

* fix register ray_serializers

* fix tests

* revert load_config

* support mars supervisor for ray task mode

* fix use_ray_serialization

* fix register and clean ray_serializers

* fix test_sync_execute create session duplciately

* fix tests

* lint

* add test_ray_dag_oscar to ci

* validate backend parameter
  • Loading branch information
chaokunyang committed Sep 7, 2022
1 parent b2d658e commit 8ee68e2
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ jobs:
mv .coverage build/.coverage.test_ray_dag.file
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag_failover.py
mv .coverage build/.coverage.test_ray_dag_failover.file
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag_oscar.py -m ray
mv .coverage build/.coverage.test_ray_dag_oscar.file
coverage combine build/ && coverage report
fi
Expand Down
56 changes: 47 additions & 9 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
AbstractClusterBackend,
)
from ...services import NodeRole
from ...services.task.execution.api import ExecutionConfig
from ...utils import lazy_import, retry_callable
from ..utils import (
load_config,
Expand Down Expand Up @@ -310,6 +311,7 @@ async def new_cluster(
worker_num: int = 1,
worker_cpu: int = 2,
worker_mem: int = 2 * 1024**3,
backend: str = None,
config: Union[str, Dict] = None,
**kwargs,
):
Expand All @@ -330,6 +332,7 @@ async def new_cluster(
worker_num,
worker_cpu,
worker_mem,
backend,
config,
n_supervisor_process=n_supervisor_process,
)
Expand Down Expand Up @@ -402,6 +405,7 @@ def __init__(
worker_num: int = 1,
worker_cpu: int = 2,
worker_mem: int = 4 * 1024**3,
backend: str = None,
config: Union[str, Dict] = None,
n_supervisor_process: int = DEFAULT_SUPERVISOR_SUB_POOL_NUM,
):
Expand All @@ -413,6 +417,7 @@ def __init__(
self._worker_num = worker_num
self._worker_cpu = worker_cpu
self._worker_mem = worker_mem
self.backend = backend
# load config file to dict.
self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE)
self.supervisor_address = None
Expand All @@ -434,6 +439,37 @@ async def start(self):
logging.basicConfig(
format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO
)
execution_config = ExecutionConfig.from_config(
self._config, backend=self.backend
)
self.backend = execution_config.backend
if self.backend == "mars":
await self.start_oscar(
self._n_supervisor_process,
self._supervisor_mem,
self._worker_num,
self._worker_cpu,
self._worker_mem,
)
elif self.backend == "ray":
execution_config.merge_from(
ExecutionConfig.from_params(
backend=self.backend,
n_worker=self._worker_num,
n_cpu=self._worker_num * self._worker_cpu,
mem_bytes=self._worker_mem,
)
)
assert self._n_supervisor_process == 0, self._n_supervisor_process
await self.start_oscar(
self._n_supervisor_process, self._supervisor_mem, 0, 0, 0
)
else:
raise ValueError(f"Unsupported backend type: {self.backend}.")

async def start_oscar(
self, n_supervisor_process, supervisor_mem, worker_num, worker_cpu, worker_mem
):
logger.info("Start cluster with config %s", self._config)
# init metrics to guarantee metrics use in driver
metric_configs = self._config.get("metrics", {})
Expand All @@ -450,7 +486,7 @@ async def start(self):
self._config.get("cluster", {})
.get("ray", {})
.get("supervisor", {})
.get("sub_pool_num", self._n_supervisor_process)
.get("sub_pool_num", n_supervisor_process)
)
from ...storage.ray import support_specify_owner

Expand All @@ -466,11 +502,11 @@ async def start(self):
self._config["cluster"]["lookup_address"] = self.supervisor_address
address_to_resources[node_placement_to_address(self._cluster_name, 0)] = {
"CPU": 1,
# "memory": self._supervisor_mem,
# "memory": supervisor_mem,
}
worker_addresses = []
if supervisor_standalone:
for worker_index in range(1, self._worker_num + 1):
for worker_index in range(1, worker_num + 1):
worker_address = process_placement_to_address(
self._cluster_name, worker_index, 0
)
Expand All @@ -479,11 +515,11 @@ async def start(self):
self._cluster_name, worker_index
)
address_to_resources[worker_node_address] = {
"CPU": self._worker_cpu,
"CPU": worker_cpu,
# "memory": self._worker_mem,
}
else:
for worker_index in range(self._worker_num):
for worker_index in range(worker_num):
worker_process_index = (
supervisor_sub_pool_num + 1 if worker_index == 0 else 0
)
Expand All @@ -495,7 +531,7 @@ async def start(self):
self._cluster_name, worker_index
)
address_to_resources[worker_node_address] = {
"CPU": self._worker_cpu,
"CPU": worker_cpu,
# "memory": self._worker_mem,
}
mo.setup_cluster(address_to_resources)
Expand Down Expand Up @@ -525,7 +561,7 @@ async def start(self):
addr,
{
"numa-0": Resource(
num_cpus=self._worker_cpu, mem_bytes=self._worker_mem
num_cpus=worker_cpu, mem_bytes=self._worker_mem
)
},
modules=get_third_party_modules_from_config(
Expand All @@ -543,7 +579,7 @@ async def start(self):
)
cluster_state_ref = self._cluster_backend.get_cluster_state_ref()
await self._cluster_backend.get_cluster_state_ref().set_config(
self._worker_cpu, self._worker_mem, self._config
worker_cpu, self._worker_mem, self._config
)
# start service
await start_supervisor(self.supervisor_address, config=self._config)
Expand Down Expand Up @@ -596,7 +632,9 @@ def __init__(self, cluster: RayCluster, session: AbstractSession):

@classmethod
async def create(cls, cluster: RayCluster) -> "RayClient":
session = await _new_session(cluster.supervisor_address, default=True)
session = await _new_session(
cluster.supervisor_address, default=True, backend=cluster.backend
)
client = RayClient(cluster, session)
AbstractSession.default._ray_client = client
return client
Expand Down
54 changes: 54 additions & 0 deletions mars/deploy/oscar/tests/test_ray_dag_oscar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest

from ....tests.core import require_ray
from ....utils import lazy_import
from ..ray import new_cluster, _load_config
from ..tests import test_local

ray = lazy_import("ray")
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "local_test_with_ray_config.yml")


@pytest.fixture
async def create_cluster(request):
param = getattr(request, "param", {})
ray_config = _load_config(CONFIG_FILE)
ray_config.update(param.get("config", {}))
client = await new_cluster(
supervisor_mem=1 * 1024**3,
worker_num=2,
worker_cpu=2,
worker_mem=1 * 1024**3,
backend="ray",
config=ray_config,
)
async with client:
yield client, param


@require_ray
@pytest.mark.asyncio
async def test_iterative_tiling(ray_start_regular_shared2, create_cluster):
await test_local.test_iterative_tiling(create_cluster)


@pytest.mark.asyncio
@require_ray
async def test_execute_describe(ray_start_regular_shared2, create_cluster):
await test_local.test_execute_describe(create_cluster)
2 changes: 1 addition & 1 deletion mars/deploy/oscar/tests/test_ray_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ async def test_auto_scale_in(ray_large_cluster):
assert await autoscaler_ref.get_dynamic_worker_nums() == 2


@pytest.mark.timeout(timeout=150)
@pytest.mark.timeout(timeout=500)
@pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 4}], indirect=True)
@require_ray
@pytest.mark.asyncio
Expand Down
4 changes: 2 additions & 2 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def get_execution_config(self):

# noinspection DuplicatedCode
def destroy(self):
self._config = None
self._task = None
self._tile_context = None
self._task_context = {}
Expand All @@ -377,6 +376,7 @@ def destroy(self):
self._cur_stage_first_output_object_ref_to_subtask = dict()
self._execute_subtask_graph_aiotask = None
self._cancelled = None
self._config = None

@classmethod
@alru_cache(cache_exceptions=False)
Expand Down Expand Up @@ -518,7 +518,7 @@ def _on_execute_aiotask_done(_):
max_retries=subtask_max_retries,
).remote(
subtask.subtask_id,
serialize(subtask_chunk_graph),
serialize(subtask_chunk_graph, context={"serializer": "ray"}),
subtask_output_meta_keys,
is_mapper,
*input_object_refs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def test_ray_executor_create(
assert mock_task_state_actor_create.call_count == 1


@require_ray
@pytest.mark.asyncio
async def test_ray_executor_destroy():
task = Task("mock_task", "mock_session")
Expand All @@ -150,6 +151,7 @@ async def test_ray_executor_destroy():
assert await executor.get_progress() == 1.0


@require_ray
def test_ray_execute_subtask_basic():
raw = np.ones((10, 10))
raw_expect = raw + 1
Expand Down
2 changes: 1 addition & 1 deletion mars/services/task/execution/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_band_resources_from_config(
n_worker: int = config["n_worker"]
n_cpu: int = config["n_cpu"]
mem_bytes: int = config["mem_bytes"]
cuda_devices: List[List[int]] = config["cuda_devices"]
cuda_devices: List[List[int]] = config.get("cuda_devices")

bands_to_resource = []
worker_cpus = n_cpu // n_worker
Expand Down

0 comments on commit 8ee68e2

Please sign in to comment.