Skip to content

Commit

Permalink
Add n_reducers and reducer_ordinal to shuffle operands (#3055)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Jun 1, 2022
1 parent b685973 commit 1aa6936
Show file tree
Hide file tree
Showing 22 changed files with 147 additions and 38 deletions.
5 changes: 5 additions & 0 deletions mars/core/operand/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class MapReduceOperand(Operand):
mapper_id = Int32Field("mapper_id", default=0)
# for reducer
reducer_index = TupleField("reducer_index", FieldTypes.uint64)
# Total reducer nums, which also be shuffle blocks for single mapper.
n_reducers = Int32Field("n_reducers")
# The reducer ordinal in all reducers. It's different from reducer_index,
# which might be a tuple.
reducer_ordinal = Int32Field("reducer_ordinal")
reducer_phase = StringField("reducer_phase", default=None)

def _new_chunks(self, inputs, kws=None, **kw):
Expand Down
20 changes: 19 additions & 1 deletion mars/core/operand/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

import numpy as np
import pandas as pd
import pytest

from ... import OutputType
from .. import Operand, TileableOperandMixin, execute, estimate_size
from .. import Operand, TileableOperandMixin, execute, estimate_size, ShuffleProxy


class MyOperand(Operand, TileableOperandMixin):
Expand Down Expand Up @@ -130,3 +131,20 @@ def execute_normally(ctx, op):
assert (
t2.execute(extra_config={"operand_executors": operand_executors}).fetch() == 2
)


def test_shuffle(setup):
from ....dataframe import DataFrame

chunk_size, n_rows = 10, 100
df = DataFrame(
pd.DataFrame(np.random.rand(n_rows, 3), columns=list("abc")),
chunk_size=chunk_size,
)
chunk_graph = df.groupby(["a"]).apply(lambda x: x).build_graph(tile=True)
[proxy_chunk] = [c for c in chunk_graph if isinstance(c.op, ShuffleProxy)]
successors = chunk_graph.successors(proxy_chunk)
n_reducer = successors[0].op.n_reducers
assert n_reducer == len(successors), (n_reducer, len(successors))
assert len(set(c.op.n_reducers for c in successors)) == 1
assert sorted([c.op.reducer_ordinal for c in successors]) == list(range(n_reducer))
9 changes: 8 additions & 1 deletion mars/dataframe/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,8 @@ def _gen_series_chunks(splits, out_shape, left_or_right, series):
for out_idx in range(out_shape[0]):
reduce_op = DataFrameIndexAlign(
stage=OperandStage.reduce,
n_reducers=out_shape[0],
reducer_ordinal=out_idx,
i=out_idx,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.series],
Expand Down Expand Up @@ -820,6 +822,8 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):
)
reduce_op = DataFrameIndexAlign(
stage=OperandStage.reduce,
n_reducers=out_shape[shuffle_axis],
reducer_ordinal=j,
i=j,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
Expand Down Expand Up @@ -853,9 +857,12 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):
).new_chunk(map_chunks, shape=())

# gen reduce chunks
for out_idx in itertools.product(*(range(s) for s in out_shape)):
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for ordinal, out_idx in enumerate(out_indices):
reduce_op = DataFrameIndexAlign(
stage=OperandStage.reduce,
n_reducers=len(out_indices),
reducer_ordinal=ordinal,
i=out_idx,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
Expand Down
4 changes: 4 additions & 0 deletions mars/dataframe/base/_duplicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ def _tile_shuffle(cls, op: "DuplicateOperand", inp):
reduce_op._method = "shuffle"
reduce_op.stage = OperandStage.reduce
reduce_op.reducer_phase = "drop_duplicates"
reduce_op.n_reducers = len(map_chunks)
reduce_op.reducer_ordinal = i
reduce_op._shuffle_size = inp.chunk_shape[0]
reduce_op._output_types = op.output_types
reduce_chunk_params = map_chunks[0].params
Expand All @@ -250,6 +252,8 @@ def _tile_shuffle(cls, op: "DuplicateOperand", inp):
put_back_op.stage = OperandStage.reduce
put_back_op.reducer_phase = "put_back"
put_back_op.reducer_index = (i,)
put_back_op.n_reducers = len(map_chunks)
put_back_op.reducer_ordinal = i
if out.ndim == 2:
put_back_chunk_params = map_chunks[i].params
else:
Expand Down
10 changes: 8 additions & 2 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ def partition_merge_data(
partition_shuffle_reduce = DataFrameGroupbySortShuffle(
stage=OperandStage.reduce,
reducer_index=(i, 0),
n_reducers=len(partition_chunks),
reducer_ordinal=i,
output_types=output_types,
**properties,
)
Expand Down Expand Up @@ -436,9 +438,13 @@ def _gen_shuffle_chunks(cls, op, chunks):

# generate reduce chunks
reduce_chunks = []
for out_idx in itertools.product(*(range(s) for s in chunk_shape)):
out_indices = list(itertools.product(*(range(s) for s in chunk_shape)))
for ordinal, out_idx in enumerate(out_indices):
reduce_op = DataFrameGroupByOperand(
stage=OperandStage.reduce, output_types=[OutputType.dataframe_groupby]
stage=OperandStage.reduce,
output_types=[OutputType.dataframe_groupby],
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
reduce_chunks.append(
reduce_op.new_chunk(
Expand Down
5 changes: 4 additions & 1 deletion mars/dataframe/groupby/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,14 @@ def tile(cls, op):

# generate reduce chunks
reduce_chunks = []
for out_idx in itertools.product(*(range(s) for s in chunk_shape)):
out_indices = list(itertools.product(*(range(s) for s in chunk_shape)))
for ordinal, out_idx in enumerate(out_indices):
reduce_op = op.copy().reset_key()
reduce_op._by = None
reduce_op._output_types = [output_type]
reduce_op.stage = OperandStage.reduce
reduce_op.reducer_ordinal = ordinal
reduce_op.n_reducers = len(out_indices)
reduce_chunks.append(
reduce_op.new_chunk(
[proxy_chunk], shape=(np.nan, np.nan), index=out_idx
Expand Down
4 changes: 3 additions & 1 deletion mars/dataframe/groupby/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,14 @@ def _tile_distributed(cls, op: "GroupBySample", in_df, weights):
)

reduce_chunks = []
for src_chunk in in_df.chunks:
for ordinal, src_chunk in enumerate(in_df.chunks):
new_op = op.copy().reset_key()
new_op._weights = None
new_op._output_types = [OutputType.tensor]
new_op.stage = OperandStage.reduce
new_op.reducer_index = (src_chunk.index[0],)
new_op.reducer_ordinal = ordinal
new_op.n_reducers = len(in_df.chunks)
new_op._input_nsplits = np.array(in_df.nsplits[0])

reduce_chunks.append(
Expand Down
13 changes: 10 additions & 3 deletions mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,12 @@ def _gen_shuffle_chunks(

# gen reduce chunks
reduce_chunks = []
for out_idx in itertools.product(*(range(s) for s in out_shape)):
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for ordinal, out_idx in enumerate(out_indices):
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce,
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
)
Expand Down Expand Up @@ -312,9 +315,13 @@ def _gen_both_shuffle_chunks(
# gen reduce chunks
left_reduce_chunks = []
right_reduce_chunks = []
for out_idx in itertools.product(*(range(s) for s in out_shape)):
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for ordinal, out_idx in enumerate(out_indices):
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce, sparse=proxy_chunk.issparse()
stage=OperandStage.reduce,
sparse=proxy_chunk.issparse(),
reducer_ordinal=ordinal,
n_reducers=len(out_indices),
)
left_param = {
"shape": (np.nan, np.nan),
Expand Down
2 changes: 2 additions & 0 deletions mars/dataframe/sort/psrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ def partition_merge_data(
stage=OperandStage.reduce,
kind=kind,
reducer_index=(i,),
reducer_ordinal=i,
n_reducers=len(partition_chunks),
output_types=op.output_types,
**cls._collect_op_properties(op)
)
Expand Down
5 changes: 3 additions & 2 deletions mars/learn/ensemble/_bagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ class BaggingSample(LearnShuffle, LearnOperandMixin):
feature_random_state = RandomStateField("feature_random_state")

reducer_ratio: float = Float32Field("reducer_ratio")
n_reducers: int = Int64Field("n_reducers", default=None)
column_offset: int = Int64Field("column_offset", default=None)

chunk_shape: Tuple[int] = TupleField("chunk_shape", FieldTypes.int64)
Expand Down Expand Up @@ -295,7 +294,7 @@ def tile(cls, op: "BaggingSample"):

n_reducers = (
op.n_reducers
if op.n_reducers is not None
if getattr(op, "n_reducers", None)
else max(1, int(in_sample.chunk_shape[0] * op.reducer_ratio))
)

Expand Down Expand Up @@ -357,6 +356,8 @@ def tile(cls, op: "BaggingSample"):
new_op = op.copy().reset_key()
new_op.random_state = None
new_op.stage = OperandStage.reduce
new_op.reducer_ordinal = idx
new_op.n_reducers = n_reducers
new_op.chunk_shape = in_sample.chunk_shape
new_op.n_estimators = op.n_estimators // n_reducers
if remain_reducers:
Expand Down
4 changes: 3 additions & 1 deletion mars/learn/utils/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def tile(cls, op):
ax for j, ax in enumerate(inp_axes) if reduce_sizes[j] > 1
)
reduce_sizes_ = tuple(rs for rs in reduce_sizes if rs > 1)
for c in map_chunks:
for ordinal, c in enumerate(map_chunks):
chunk_op = LearnShuffle(
stage=OperandStage.reduce,
output_types=output_types,
Expand All @@ -318,6 +318,8 @@ def tile(cls, op):
if reduce_sizes[j] > 1
),
reduce_sizes=reduce_sizes_,
reducer_ordinal=ordinal,
n_reducers=len(map_chunks),
)
params = cls._calc_chunk_params(
c, inp_axes, inp.chunk_shape, oup, output_type, chunk_op, False
Expand Down
10 changes: 5 additions & 5 deletions mars/services/task/execution/mars/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
logger = logging.getLogger(__name__)


def _get_n_reducer(subtask: Subtask) -> int:
def _get_n_reducers(subtask: Subtask) -> int:
return len(
[
r
Expand Down Expand Up @@ -377,9 +377,9 @@ async def _incref_stage(self, stage_processor: "TaskStageProcessor"):
for pre_graph in subtask_graph.iter_predecessors(subtask):
for chk in pre_graph.chunk_graph.results:
if isinstance(chk.op, ShuffleProxy):
n_reducer = _get_n_reducer(subtask)
n_reducers = _get_n_reducers(subtask)
for map_chunk in chk.inputs:
incref_chunk_key_to_counts[map_chunk.key] += n_reducer
incref_chunk_key_to_counts[map_chunk.key] += n_reducers
result_chunks = stage_processor.chunk_graph.result_chunks
for c in result_chunks:
incref_chunk_key_to_counts[c.key] += 1
Expand Down Expand Up @@ -461,9 +461,9 @@ async def _decref_input_subtasks(
for result_chunk in in_subtask.chunk_graph.results:
# for reducer chunk, decref mapper chunks
if isinstance(result_chunk.op, ShuffleProxy):
n_reducer = _get_n_reducer(subtask)
n_reducers = _get_n_reducers(subtask)
for inp in result_chunk.inputs:
decref_chunk_key_to_counts[inp.key] += n_reducer
decref_chunk_key_to_counts[inp.key] += n_reducers
decref_chunk_key_to_counts[result_chunk.key] += 1
logger.debug(
"Decref chunks %s when subtask %s finish",
Expand Down
18 changes: 17 additions & 1 deletion mars/services/task/supervisor/tests/task_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
register,
unregister,
)
from .....core.operand import Fetch
from .....core.operand import Fetch, ShuffleProxy
from .....resource import Resource
from .....tests.core import _check_args, ObjectCheckMixin
from .....typing import BandType, ChunkType
Expand Down Expand Up @@ -177,4 +177,20 @@ def analyze(
subtask.extra_config["check_keys"] = [
c.key for c in subtask.chunk_graph.results if c in results
]
proxy_chunks = [
c for c in subtask.chunk_graph if isinstance(c.op, ShuffleProxy)
]
if proxy_chunks:
assert len(proxy_chunks) == 1, proxy_chunks
proxy_chunk_key = proxy_chunks[0].key
proxy_chunk = next(c for c in chunk_graph if c.key == proxy_chunk_key)
reducer_chunks = chunk_graph.successors(proxy_chunk)
n_reducers_list = [c.op.n_reducers for c in reducer_chunks]
n_reducers = n_reducers_list[0]
reducer_ordinals = [c.op.reducer_ordinal for c in reducer_chunks]
assert set(reducer_ordinals).issubset(list(range(n_reducers))), (
reducer_ordinals,
n_reducers,
)
assert len(set(n_reducers_list)) == 1, n_reducers_list
return subtask_graph
4 changes: 4 additions & 0 deletions mars/tensor/base/psrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ def partition_merge_data(
order=op.order,
kind=kind,
reducer_index=(i,),
n_reducers=len(partition_chunks),
reducer_ordinal=i,
dtype=partition_chunk.dtype,
gpu=partition_chunk.op.gpu,
need_align=need_align,
Expand Down Expand Up @@ -375,6 +377,8 @@ def align_partitions_data(
stage=OperandStage.reduce,
axis=op.axis,
reducer_index=(i,),
reducer_ordinal=i,
n_reducers=len(align_map_chunks),
dtype=align_map_chunk.dtype,
gpu=align_map_chunk.op.gpu,
)
Expand Down
10 changes: 7 additions & 3 deletions mars/tensor/base/unique.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ def _tile_via_shuffle(cls, op):
axis=op.axis,
reducer_index=(i,),
reducer_phase="agg",
reducer_ordinal=i,
n_reducers=aggregate_size,
)
kws = cls._gen_kws(op, inp, chunk=True, chunk_index=i)
chunks = reduce_op.new_chunks(
Expand All @@ -254,6 +256,8 @@ def _tile_via_shuffle(cls, op):
for j, cs in enumerate(unique_on_chunk_sizes):
chunk_op = TensorUnique(
stage=OperandStage.reduce,
reducer_ordinal=j,
n_reducers=len(unique_on_chunk_sizes),
dtype=map_inverse_chunks[0].dtype,
reducer_index=(j,),
reducer_phase="inverse",
Expand Down Expand Up @@ -295,7 +299,7 @@ def _execute_map(cls, ctx, op: "TensorUnique"):
(ar,), device_id, xp = as_same_device(
[ctx[c.key] for c in op.inputs], device=op.device, ret_extra=True
)
n_reducer = op.aggregate_size
n_reducers = op.aggregate_size

with device(device_id):
results = xp.unique(
Expand Down Expand Up @@ -323,13 +327,13 @@ def _execute_map(cls, ctx, op: "TensorUnique"):
)
if unique_ar.size > 0:
unique_reducers = dense_xp.asarray(
hash_on_axis(unique_ar, op.axis, n_reducer)
hash_on_axis(unique_ar, op.axis, n_reducers)
)
else:
unique_reducers = dense_xp.empty_like(unique_ar)
ind_ar = dense_xp.arange(ar.shape[op.axis])

for reducer in range(n_reducer):
for reducer in range(n_reducers):
res = []
cond = unique_reducers == reducer
# unique
Expand Down

0 comments on commit 1aa6936

Please sign in to comment.