Skip to content

Commit

Permalink
[Ray] Optimize ray executor submit subtask (#3271)
Browse files Browse the repository at this point in the history
* Optimize Ray executor submit subtask

* Pin pandas<1.5.0

* Try to fix CI

* Try to fix CI

* Print stderr of asv benchmark

* Fix

* Fix Ray executor track bug

* Fix

* Fix

* Fix

* Remove asv benchmark

* Improve coverage

* Refine comments

* Improve coverage

Co-authored-by: 刘宝 <po.lb@antgroup.com>
  • Loading branch information
fyrestone and 刘宝 committed Oct 11, 2022
1 parent eacf12e commit 6e2f7c9
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
4 changes: 2 additions & 2 deletions mars/lib/filesystem/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
>>> mdf = md.read_csv("s3://bucket/example.csv", index_col=0, storage_options={
>>> "client_kwargs": {
>>> "endpoint_url": "http://192.168.1.12:9000",
>>> "aws_access_key_id": "RwDeqMoctbLG3yly",
>>> "aws_secret_access_key": "uwinWm1hTAGJ6Wnipa4tbE4SwO3Mx6Ek",
>>> "aws_access_key_id": "<s3 access id>",
>>> "aws_secret_access_key": "<s3 access key>",
>>> }})
>>> # Export environment vars AWS_ENDPOINT_URL / AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY.
>>> mdf = md.read_csv("s3://bucket/example.csv", index_col=0)
Expand Down
6 changes: 5 additions & 1 deletion mars/services/subtask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Subtask(Serializable):
update_meta_chunks: List[ChunkType] = ListField(
"update_meta_chunks", FieldTypes.reference(ChunkData)
)
# An unique and deterministic key for subtask compute logic. See logic_key in operator.py.
# A unique and deterministic key for subtask compute logic. See logic_key in operator.py.
logic_key: str = StringField("logic_key")
# index for subtask with same compute logic.
logic_index: int = Int32Field("logic_index")
Expand All @@ -81,6 +81,8 @@ class Subtask(Serializable):
# subtask can only run in specified bands in `expect_bands`
bands_specified: bool = BoolField("bands_specified")
required_resource: Resource = AnyField("required_resource", Resource)
# The count of result chunks that are the stage's results.
stage_n_outputs: int = Int32Field("stage_n_outputs")

def __init__(
self,
Expand All @@ -102,6 +104,7 @@ def __init__(
logic_parallelism: int = None,
bands_specified: bool = False,
required_resource: Resource = None,
stage_n_outputs: int = 0,
):
super().__init__(
subtask_id=subtask_id,
Expand All @@ -122,6 +125,7 @@ def __init__(
logic_parallelism=logic_parallelism,
bands_specified=bands_specified,
required_resource=required_resource,
stage_n_outputs=stage_n_outputs,
)
self._pure_depend_keys = None
self._repr = None
Expand Down
2 changes: 2 additions & 0 deletions mars/services/task/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def _gen_subtask_info(
if c not in chunk_graph:
chunk_graph.add_node(c)
chunk_graph.add_edge(c, out_chunk)
stage_n_outputs = len(result_chunks)
# add chunks with no successors into result chunks
result_chunks.extend(
c
Expand Down Expand Up @@ -313,6 +314,7 @@ def _gen_subtask_info(
retryable=retryable,
update_meta_chunks=update_meta_chunks,
extra_config=self._extra_config,
stage_n_outputs=stage_n_outputs,
)

is_shuffle_proxy = False
Expand Down
29 changes: 14 additions & 15 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def _cancel_ray_task(obj_ref, kill_timeout: int = 3):
def execute_subtask(
subtask_id: str,
subtask_chunk_graph: ChunkGraph,
output_meta_keys: Set[str],
output_meta_n_keys: int,
is_mapper,
*inputs,
):
Expand All @@ -137,8 +137,8 @@ def execute_subtask(
id of subtask
subtask_chunk_graph: ChunkGraph
chunk graph for subtask
output_meta_keys: Set[str]
will be None if subtask is a shuffle mapper.
output_meta_n_keys: int
will be 0 if subtask is a shuffle mapper.
is_mapper: bool
Whether current subtask is a shuffle mapper. Note that shuffle reducers such as `DataFrameDropDuplicates`
can be a mapper at the same time.
Expand Down Expand Up @@ -212,19 +212,18 @@ def execute_subtask(
# So sort keys by reducer_index to ensure mapper outputs consist with reducer_ordinal,
# then downstream can fetch shuffle blocks by reducer_ordinal.
mapper_output = dict(sorted(mapper_output.items(), key=lambda item: item[0][1]))
if output_meta_keys:
if output_meta_n_keys:
output_meta = {}
# for non-shuffle subtask, record meta in supervisor.
for chunk in subtask_chunk_graph.result_chunks:
for chunk in subtask_chunk_graph.result_chunks[:output_meta_n_keys]:
chunk_key = chunk.key
if chunk_key in output_meta_keys and chunk_key not in output_meta:
if isinstance(chunk.op, Fuse):
if chunk_key not in output_meta:
if isinstance(chunk.op, Fuse): # pragma: no cover
# fuse op
chunk = chunk.chunk
data = context[chunk_key]
memory_size = calc_data_size(data)
output_meta[chunk_key] = get_chunk_params(chunk), memory_size
assert len(output_meta_keys) == len(output_meta)
output_values.append(output_meta)
output_values.extend(normal_output.values())
output_values.extend(mapper_output.values())
Expand Down Expand Up @@ -505,12 +504,11 @@ def _on_execute_aiotask_done(_):
output_keys, out_count = _get_subtask_out_info(
subtask_chunk_graph, is_mapper, n_reducers
)
subtask_output_meta_keys = result_meta_keys & output_keys
if is_mapper:
# shuffle meta won't be recorded in meta service.
output_count = out_count
else:
output_count = out_count + bool(subtask_output_meta_keys)
output_count = out_count + bool(subtask.stage_n_outputs)
subtask_max_retries = subtask_max_retries if subtask.retryable else 0
output_object_refs = self._ray_executor.options(
num_cpus=subtask_num_cpus,
Expand All @@ -519,7 +517,7 @@ def _on_execute_aiotask_done(_):
).remote(
subtask.subtask_id,
serialize(subtask_chunk_graph, context={"serializer": "ray"}),
subtask_output_meta_keys,
subtask.stage_n_outputs,
is_mapper,
*input_object_refs,
)
Expand All @@ -530,7 +528,7 @@ def _on_execute_aiotask_done(_):
self._cur_stage_first_output_object_ref_to_subtask[
output_object_refs[0]
] = subtask
if subtask_output_meta_keys:
if subtask.stage_n_outputs:
meta_object_ref, *output_object_refs = output_object_refs
# TODO(fyrestone): Fetch(not get) meta object here.
output_meta_object_refs.append(meta_object_ref)
Expand Down Expand Up @@ -614,9 +612,10 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
memory_size=chunk_meta.memory_size,
)
)
update_lifecycles.append(
self._lifecycle_api.track.delay(tileable.key, chunk_keys)
)
update_lifecycles.append(
self._lifecycle_api.track.delay(tileable.key, chunk_keys)
)
assert len(update_lifecycles) == len(tileable_keys)
await self._meta_api.set_chunk_meta.batch(*update_metas)
await self._lifecycle_api.track.batch(*update_lifecycles)
await self._lifecycle_api.incref_tileables(tileable_keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

from collections import Counter

from ...... import dataframe as md
from ...... import tensor as mt
from ......config import Config
from ......core import TileContext
from ......core.context import get_context
from ......core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
from ......core.operand import ShuffleFetchType
from ......lib.aio.isolation import new_isolation, stop_isolation
from ......resource import Resource
from ......serialization import serialize
Expand Down Expand Up @@ -61,7 +63,14 @@ def _gen_subtask_graph(t):
bands = [(f"address_{i}", "numa-0") for i in range(4)]
band_resource = dict((band, Resource(num_cpus=1)) for band in bands)
task = Task("mock_task", "mock_session", tileable_graph)
analyzer = GraphAnalyzer(chunk_graph, band_resource, task, Config(), dict())
analyzer = GraphAnalyzer(
chunk_graph,
band_resource,
task,
Config(),
dict(),
shuffle_fetch_type=ShuffleFetchType.FETCH_BY_INDEX,
)
subtask_graph = analyzer.gen_subtask_graph()
return chunk_graph, subtask_graph

Expand Down Expand Up @@ -163,9 +172,7 @@ def test_ray_execute_subtask_basic():
r = execute_subtask(subtask_id, serialize(subtask_chunk_graph), set(), False)
np.testing.assert_array_equal(r, raw_expect)
test_get_meta_chunk = subtask_chunk_graph.result_chunks[0]
r = execute_subtask(
subtask_id, serialize(subtask_chunk_graph), {test_get_meta_chunk.key}, False
)
r = execute_subtask(subtask_id, serialize(subtask_chunk_graph), 1, False)
assert len(r) == 2
meta_dict, r = r
assert len(meta_dict) == 1
Expand Down Expand Up @@ -438,3 +445,60 @@ def pop(self, k, d=None):
)
assert chunk_keys1 == set(popped_seq[0:4])
assert chunk_keys2 == set(popped_seq[4:])


@require_ray
@pytest.mark.asyncio
async def test_execute_shuffle(ray_start_regular_shared2):
chunk_size, n_rows = 10, 50
df = md.DataFrame(
pd.DataFrame(np.random.rand(n_rows, 3), columns=list("abc")),
chunk_size=chunk_size,
)
df2 = df.groupby(["a"]).apply(lambda x: x)
chunk_graph, subtask_graph = _gen_subtask_graph(df2)
task = Task("mock_task", "mock_session", fuse_enabled=True)

class MockRayExecutor:
@staticmethod
def options(**kwargs):
num_returns = kwargs["num_returns"]

class _Wrapper:
@staticmethod
def remote(*args):
args = [
ray.get(a) if isinstance(a, ray.ObjectRef) else a for a in args
]
r = execute_subtask(*args)
assert len(r) == num_returns
return [ray.put(i) for i in r]

return _Wrapper

mock_config = RayExecutionConfig.from_execution_config(
{
"backend": "ray",
"ray": {
"subtask_monitor_interval": 0,
"subtask_max_retries": 0,
"n_cpu": 1,
"n_worker": 1,
"subtask_cancel_timeout": 1,
},
}
)
tile_context = MockTileContext()
executor = MockRayTaskExecutor(
config=mock_config,
task=task,
tile_context=tile_context,
task_context={},
task_chunks_meta={},
lifecycle_api=None,
meta_api=None,
)
executor._ray_executor = MockRayExecutor
await executor.execute_subtask_graph(
"mock_stage", subtask_graph, chunk_graph, tile_context
)
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ include_package_data = True
packages = find:
install_requires =
numpy>=1.14.0
pandas>=1.1.0,<=1.4.4
pandas>=1.0.0,<1.5.0
scipy>=1.0.0
scikit-learn>=0.20
numexpr>=2.6.4
Expand Down

0 comments on commit 6e2f7c9

Please sign in to comment.