Skip to content

Commit

Permalink
[Shuffle] Add n_mappers and n_reducers to ShuffleProxy (#3160)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Jun 24, 2022
1 parent cba1b43 commit 4bcfd7f
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 33 deletions.
17 changes: 16 additions & 1 deletion mars/core/operand/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@

class ShuffleProxy(VirtualOperand):
_op_type_ = opcodes.SHUFFLE_PROXY
n_mappers = Int32Field("n_mappers", default=0)
# `n_reducers` will be updated in `MapReduceOperand._new_chunks`
n_reducers = Int32Field("n_reducers", default=0)

def _new_chunks(self, inputs, kws=None, **kw):
self.n_mappers = len(inputs)
return super()._new_chunks(inputs, kws, **kw)


class MapReduceOperand(Operand):
Expand All @@ -35,6 +42,7 @@ class MapReduceOperand(Operand):
n_reducers = Int32Field("n_reducers")
# The reducer ordinal in all reducers. It's different from reducer_index,
# which might be a tuple.
# `reducer_ordinal` will be set in `_new_chunks`.
reducer_ordinal = Int32Field("reducer_ordinal")
reducer_phase = StringField("reducer_phase", default=None)

Expand All @@ -44,7 +52,14 @@ def _new_chunks(self, inputs, kws=None, **kw):
if kws:
index = kws[0].get("index")
self.reducer_index = index or kw.get("index")

if self.stage == OperandStage.reduce:
# Operands such as `TensorIndexSetValue` will have multiple inputs, some won't be ProxyChunk
proxy_operands = [c.op for c in inputs if isinstance(c.op, ShuffleProxy)]
if proxy_operands:
# For create reduce checks with `FetchShuffle`, `proxy_operands` will be empty.
proxy = proxy_operands[0]
self.reducer_ordinal = proxy.n_reducers
proxy.n_reducers += 1
return super()._new_chunks(inputs, kws, **kw)

def get_dependent_data_keys(self):
Expand Down
5 changes: 1 addition & 4 deletions mars/dataframe/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,6 @@ def _gen_series_chunks(splits, out_shape, left_or_right, series):
reduce_op = DataFrameIndexAlign(
stage=OperandStage.reduce,
n_reducers=out_shape[0],
reducer_ordinal=out_idx,
i=out_idx,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.series],
Expand Down Expand Up @@ -824,7 +823,6 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):
reduce_op = DataFrameIndexAlign(
stage=OperandStage.reduce,
n_reducers=out_shape[shuffle_axis],
reducer_ordinal=j,
i=j,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
Expand Down Expand Up @@ -859,11 +857,10 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):

# gen reduce chunks
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for ordinal, out_idx in enumerate(out_indices):
for out_idx in out_indices:
reduce_op = DataFrameIndexAlign(
stage=OperandStage.reduce,
n_reducers=len(out_indices),
reducer_ordinal=ordinal,
i=out_idx,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
Expand Down
4 changes: 1 addition & 3 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ def partition_merge_data(
stage=OperandStage.reduce,
reducer_index=(i, 0),
n_reducers=len(partition_chunks),
reducer_ordinal=i,
output_types=output_types,
**properties,
)
Expand Down Expand Up @@ -439,11 +438,10 @@ def _gen_shuffle_chunks(cls, op, chunks):
# generate reduce chunks
reduce_chunks = []
out_indices = list(itertools.product(*(range(s) for s in chunk_shape)))
for ordinal, out_idx in enumerate(out_indices):
for out_idx in out_indices:
reduce_op = DataFrameGroupByOperand(
stage=OperandStage.reduce,
output_types=[OutputType.dataframe_groupby],
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
reduce_chunks.append(
Expand Down
6 changes: 2 additions & 4 deletions mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,9 @@ def _gen_shuffle_chunks(
# gen reduce chunks
reduce_chunks = []
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for ordinal, out_idx in enumerate(out_indices):
for out_idx in out_indices:
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce,
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
Expand Down Expand Up @@ -316,11 +315,10 @@ def _gen_both_shuffle_chunks(
left_reduce_chunks = []
right_reduce_chunks = []
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for ordinal, out_idx in enumerate(out_indices):
for out_idx in out_indices:
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce,
sparse=proxy_chunk.issparse(),
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
left_param = {
Expand Down
1 change: 0 additions & 1 deletion mars/dataframe/sort/psrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def partition_merge_data(
stage=OperandStage.reduce,
kind=kind,
reducer_index=(i,),
reducer_ordinal=i,
n_reducers=len(partition_chunks),
output_types=op.output_types,
**cls._collect_op_properties(op)
Expand Down
3 changes: 1 addition & 2 deletions mars/learn/utils/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def tile(cls, op):
ax for j, ax in enumerate(inp_axes) if reduce_sizes[j] > 1
)
reduce_sizes_ = tuple(rs for rs in reduce_sizes if rs > 1)
for ordinal, c in enumerate(map_chunks):
for c in map_chunks:
chunk_op = LearnShuffle(
stage=OperandStage.reduce,
output_types=output_types,
Expand All @@ -318,7 +318,6 @@ def tile(cls, op):
if reduce_sizes[j] > 1
),
reduce_sizes=reduce_sizes_,
reducer_ordinal=ordinal,
n_reducers=len(map_chunks),
)
params = cls._calc_chunk_params(
Expand Down
10 changes: 10 additions & 0 deletions mars/services/task/supervisor/tests/task_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,14 @@ def analyze(
n_reducers,
)
assert len(set(n_reducers_list)) == 1, n_reducers_list
mapper_chunks = chunk_graph.predecessors(proxy_chunk)
assert proxy_chunk.op.n_mappers == len(mapper_chunks), (
proxy_chunk.op.n_mappers,
mapper_chunks,
)
# If some reducer data are not used by downstream, then it won't be included in the chunk graph.
assert proxy_chunk.op.n_reducers >= n_reducers, (
proxy_chunk.op.n_reducers,
n_reducers,
)
return subtask_graph
2 changes: 0 additions & 2 deletions mars/tensor/base/psrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ def partition_merge_data(
kind=kind,
reducer_index=(i,),
n_reducers=len(partition_chunks),
reducer_ordinal=i,
dtype=partition_chunk.dtype,
gpu=partition_chunk.op.gpu,
need_align=need_align,
Expand Down Expand Up @@ -377,7 +376,6 @@ def align_partitions_data(
stage=OperandStage.reduce,
axis=op.axis,
reducer_index=(i,),
reducer_ordinal=i,
n_reducers=len(align_map_chunks),
dtype=align_map_chunk.dtype,
gpu=align_map_chunk.op.gpu,
Expand Down
2 changes: 0 additions & 2 deletions mars/tensor/base/unique.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ def _tile_via_shuffle(cls, op):
axis=op.axis,
reducer_index=(i,),
reducer_phase="agg",
reducer_ordinal=i,
n_reducers=aggregate_size,
)
kws = cls._gen_kws(op, inp, chunk=True, chunk_index=i)
Expand All @@ -256,7 +255,6 @@ def _tile_via_shuffle(cls, op):
for j, cs in enumerate(unique_on_chunk_sizes):
chunk_op = TensorUnique(
stage=OperandStage.reduce,
reducer_ordinal=j,
n_reducers=len(unique_on_chunk_sizes),
dtype=map_inverse_chunks[0].dtype,
reducer_index=(j,),
Expand Down
4 changes: 1 addition & 3 deletions mars/tensor/indexing/index_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,11 @@ def _shuffle_fancy_indexes(
out_indices = list(
itertools.product(*(range(tileable.chunk_shape[ax]) for ax in axes))
)
for ordinal, chunk_index in enumerate(out_indices):
for chunk_index in out_indices:
reduce_op = FancyIndexingDistribute(
stage=OperandStage.reduce,
axes=axes,
dtype=proxy_chunk.dtype,
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
# chunks of fancy indexes on each axis
Expand Down Expand Up @@ -933,7 +932,6 @@ def postprocess(self, index_info: IndexInfo, context: IndexHandlerContext) -> No
dtype=proxy_chunk.dtype,
sparse=to_shuffle_chunks[0].issparse(),
reducer_index=(next(it),),
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
reduce_chunk_shape = (
Expand Down
3 changes: 1 addition & 2 deletions mars/tensor/indexing/setitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,14 @@ def _tile_fancy_index(cls, op: "TensorIndexSetValue"):

reducer_chunks = []
offsets_on_axis = [np.cumsum([0] + list(split)) for split in input_nsplits]
for ordinal, input_chunk in enumerate(inp.chunks):
for input_chunk in inp.chunks:
chunk_offsets = tuple(
offsets_on_axis[axis][input_chunk.index[axis]]
for axis in range(len(inp.shape))
)
reducer_op = TensorIndexSetValue(
stage=OperandStage.reduce,
n_reducers=len(inp.chunks),
reducer_ordinal=ordinal,
dtype=input_chunk.dtype,
shuffle_axes=shuffle_axes,
chunk_offsets=chunk_offsets,
Expand Down
3 changes: 1 addition & 2 deletions mars/tensor/random/permutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,10 @@ def tile(cls, op):
dtype=out_tensor.dtype, _tensor_keys=[in_tensor.key]
).new_chunk(map_chunks, shape=())

for ordinal, c in enumerate(map_chunks):
for c in map_chunks:
chunk_op = TensorPermutation(
stage=OperandStage.reduce,
n_reducers=len(map_chunks),
reducer_ordinal=ordinal,
seed=reduce_seeds[c.index[op.axis]],
axis=op.axis,
)
Expand Down
3 changes: 1 addition & 2 deletions mars/tensor/reshape/reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,10 @@ def _tile_as_shuffle(op):
out_indices = list(
zip(itertools.product(*out_nsplits), itertools.product(*chunk_size_idxes))
)
for ordinal, (chunk_shape, chunk_idx) in enumerate(out_indices):
for chunk_shape, chunk_idx in out_indices:
chunk_op = TensorReshape(
stage=OperandStage.reduce,
dtype=tensor.dtype,
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
shuffle_outputs.append(
Expand Down
1 change: 0 additions & 1 deletion mars/tensor/spatial/distance/pdist.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def _tile_chunks(cls, op, in_tensor, w, v, vi):
reduce_chunk_op = TensorPdist(
stage=OperandStage.reduce,
dtype=out_tensor.dtype,
reducer_ordinal=p,
n_reducers=aggregate_size,
)
reduce_chunk = reduce_chunk_op.new_chunk(
Expand Down
5 changes: 1 addition & 4 deletions mars/tensor/spatial/distance/squareform.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,10 @@ def _tile_chunks(cls, op, chunk_size):
reduce_chunks = []
out_shape_iter = itertools.product(*chunk_size)
out_indices = list(itertools.product(*(range(len(cs)) for cs in chunk_size)))
for ordinal, (out_idx, out_shape) in enumerate(
zip(out_indices, out_shape_iter)
):
for out_idx, out_shape in zip(out_indices, out_shape_iter):
reduce_chunk_op = TensorSquareform(
stage=OperandStage.reduce,
dtype=out.dtype,
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
reduce_chunk = reduce_chunk_op.new_chunk(
Expand Down

0 comments on commit 4bcfd7f

Please sign in to comment.