Skip to content

Commit

Permalink
[Ray] Support reducer has inputs which isn't mapper (#3206)
Browse files Browse the repository at this point in the history
* Support reducer has inputs which isn't mapper

* enable test_setitem_fancy_index_execution

* remove unnecessary isinstance checks

* fix coverage
  • Loading branch information
chaokunyang committed Aug 8, 2022
1 parent 7417139 commit 3cff588
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 34 deletions.
78 changes: 44 additions & 34 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,30 @@ def execute_subtask(
logger.info("Begin to execute subtask: %s", subtask_id)
# optimize chunk graph.
subtask_chunk_graph = _optimize_subtask_graph(subtask_chunk_graph)
start_chunks = _get_start_chunks(subtask_chunk_graph)
if isinstance(start_chunks[0].op, FetchShuffle):
assert len(start_chunks) == 1, start_chunks
fetch_chunks, shuffle_chunk = _get_fetch_chunks(subtask_chunk_graph)
input_data = {}
if shuffle_chunk is not None:
# the subtask is a reducer subtask
n_mappers = len(inputs)
n_mappers = shuffle_chunk.op.n_mappers
# some reducer may have multiple output chunks, see `PSRSshuffle._execute_reduce` and
# https://user-images.githubusercontent.com/12445254/168569524-f09e42a7-653a-4102-bdf0-cc1631b3168d.png
reducer_chunks = subtask_chunk_graph.successors(start_chunks[0])
reducer_chunks = subtask_chunk_graph.successors(shuffle_chunk)
reducer_operands = set(c.op for c in reducer_chunks)
if len(reducer_operands) != 1:
if len(reducer_operands) != 1: # pragma: no cover
raise ValueError(
f"Subtask {subtask_id} has more than 1 reduce operands: {subtask_chunk_graph.to_dot()}"
)
reducer_operand = reducer_chunks[0].op
reducer_index = reducer_operand.reducer_index
# mock input keys, keep this in sync with `MapReducerOperand#_iter_mapper_key_idx_pairs`
input_keys = [(i, reducer_index) for i in range(n_mappers)]
else:
input_keys = [c.key for c in start_chunks if isinstance(c.op, Fetch)]
input_data.update(
{(i, reducer_index): block for i, block in enumerate(inputs[-n_mappers:])}
)
inputs = inputs[:-n_mappers]
input_keys = [start_chunk.key for start_chunk in fetch_chunks]
input_data.update(zip(input_keys, inputs))
context = RayExecutionWorkerContext(
lambda: RayTaskState.get_handle(task_id), zip(input_keys, inputs)
lambda: RayTaskState.get_handle(task_id), input_data
)

for chunk in subtask_chunk_graph.topological_iter():
Expand Down Expand Up @@ -227,8 +230,16 @@ def execute_subtask(
return output_values[0] if len(output_values) == 1 else output_values


def _get_start_chunks(chunk_graph):
return sorted(chunk_graph.iter_indep(), key=operator.attrgetter("key"))
def _get_fetch_chunks(chunk_graph):
fetch_chunks = []
shuffle_chunk = None
for start_chunk in chunk_graph.iter_indep():
if isinstance(start_chunk.op, FetchShuffle):
assert shuffle_chunk is None, shuffle_chunk
shuffle_chunk = start_chunk
elif isinstance(start_chunk.op, Fetch):
fetch_chunks.append(start_chunk)
return sorted(fetch_chunks, key=operator.attrgetter("key")), shuffle_chunk


def _get_subtask_out_info(
Expand Down Expand Up @@ -660,29 +671,28 @@ async def _load_subtask_inputs(
It updates the context if the input object refs are fetched from
the meta service.
"""
input_object_refs = []
normal_object_refs = []
shuffle_object_refs = []
key_to_get_meta = {}
# for non-shuffle chunks, chunk key will be used for indexing object refs.
# for shuffle chunks, mapper subtasks will have only one mapper chunk, and all outputs for mapper
# subtask will be shuffle blocks, the downstream reducers will receive inputs in the mappers order.
start_chunks = _get_start_chunks(subtask.chunk_graph)
for index, start_chunk in enumerate(start_chunks):
if isinstance(start_chunk.op, Fetch):
chunk_key = start_chunk.key
# pure_depend data is not used, skip it.
if chunk_key in subtask.pure_depend_keys:
input_object_refs.append(None)
elif chunk_key in context:
input_object_refs.append(context[chunk_key])
else:
input_object_refs.append(None)
key_to_get_meta[index] = self._meta_api.get_chunk_meta.delay(
chunk_key, fields=["object_refs"]
)
elif isinstance(start_chunk.op, FetchShuffle):
assert len(start_chunks) == 1, start_chunks
# shuffle meta won't be recorded in meta service, query it from shuffle manager.
return shuffle_manager.get_reducer_input_refs(subtask)
fetch_chunks, shuffle_chunk = _get_fetch_chunks(subtask.chunk_graph)
for index, fetch_chunk in enumerate(fetch_chunks):
chunk_key = fetch_chunk.key
# pure_depend data is not used, skip it.
if chunk_key in subtask.pure_depend_keys:
normal_object_refs.append(None)
elif chunk_key in context:
normal_object_refs.append(context[chunk_key])
else:
normal_object_refs.append(None)
key_to_get_meta[index] = self._meta_api.get_chunk_meta.delay(
chunk_key, fields=["object_refs"]
)
if shuffle_chunk is not None:
# shuffle meta won't be recorded in meta service, query it from shuffle manager.
shuffle_object_refs = list(shuffle_manager.get_reducer_input_refs(subtask))

if key_to_get_meta:
logger.info(
Expand All @@ -695,9 +705,9 @@ async def _load_subtask_inputs(
)
for index, meta in zip(key_to_get_meta.keys(), meta_list):
object_ref = meta["object_refs"][0]
input_object_refs[index] = object_ref
context[start_chunks[index].key] = object_ref
return input_object_refs
normal_object_refs[index] = object_ref
context[fetch_chunks[index].key] = object_ref
return normal_object_refs + shuffle_object_refs

async def _update_progress_and_collect_garbage(
self,
Expand Down
1 change: 1 addition & 0 deletions mars/tensor/indexing/tests/test_indexing_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def test_mixed_indexing_execution(setup):
np.testing.assert_array_equal(res, expected)


@pytest.mark.ray_dag
def test_setitem_fancy_index_execution(setup):
rs = np.random.RandomState(0)

Expand Down

0 comments on commit 3cff588

Please sign in to comment.