Skip to content

Commit

Permalink
[Ray] Supports task-based shuffle for ray (#3040)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Jun 27, 2022
1 parent 4bcfd7f commit 31bd6cc
Show file tree
Hide file tree
Showing 28 changed files with 759 additions and 132 deletions.
6 changes: 3 additions & 3 deletions benchmarks/asv_bench/benchmarks/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from mars.services.task import new_task_id
from mars.utils import tokenize


# do warmup
serialize(None)

Expand Down Expand Up @@ -193,7 +192,6 @@ def setup(self):
from mars.core import OutputType
from mars.core.operand import OperandStage
from mars.dataframe.operands import DataFrameShuffleProxy
from mars.utils import build_fetch

source_chunks = []
for i in range(1000):
Expand All @@ -208,7 +206,9 @@ def setup(self):
output_types=[OutputType.dataframe]
).new_chunk(source_chunks)

fetch_chunk = build_fetch(shuffle_chunk)
from mars.utils import build_fetch_shuffle

fetch_chunk = build_fetch_shuffle(shuffle_chunk, n_reducers=10)

self.test_fetch_chunks = []
for i in range(1000):
Expand Down
2 changes: 1 addition & 1 deletion mars/core/operand/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
LogicKeyGenerator,
)
from .core import TileableOperandMixin, execute, estimate_size
from .fetch import Fetch, FetchMixin, FetchShuffle
from .fetch import Fetch, FetchMixin, FetchShuffle, ShuffleFetchType
from .fuse import Fuse, FuseChunkMixin
from .objects import (
ObjectOperand,
Expand Down
17 changes: 16 additions & 1 deletion mars/core/operand/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum

from ... import opcodes
from ...serialization.serializables import FieldTypes, StringField, ListField
from ...serialization.serializables import (
FieldTypes,
StringField,
ListField,
Int32Field,
ReferenceField,
)
from .base import Operand
from .core import TileableOperandMixin

Expand Down Expand Up @@ -47,3 +54,11 @@ class FetchShuffle(Operand):
source_keys = ListField("source_keys", FieldTypes.string)
source_idxes = ListField("source_idxes", FieldTypes.tuple(FieldTypes.uint64))
source_mappers = ListField("source_mappers", FieldTypes.uint16)
n_mappers = Int32Field("n_mappers")
n_reducers = Int32Field("n_reducers")
shuffle_fetch_type = ReferenceField("shuffle_fetch_type")


class ShuffleFetchType(enum.Enum):
FETCH_BY_KEY = 0
FETCH_BY_INDEX = 1
24 changes: 24 additions & 0 deletions mars/core/operand/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from . import ShuffleFetchType, FetchShuffle
from ... import opcodes
from ...serialization.serializables import (
Int32Field,
Expand All @@ -34,6 +35,11 @@ def _new_chunks(self, inputs, kws=None, **kw):


class MapReduceOperand(Operand):
"""
An operand for shuffle execution which partitions data by the value in each record’s partition key, and
send the partitioned data from all mappers to all reducers.
"""

# for mapper
mapper_id = Int32Field("mapper_id", default=0)
# for reducer
Expand Down Expand Up @@ -74,13 +80,16 @@ def get_dependent_data_keys(self):
[(chunk.key, self.reducer_index) for chunk in inp.inputs or ()]
)
elif isinstance(inp.op, FetchShuffle):
# fetch shuffle by index doesn't store data keys, so it won't run into this function.
assert inp.op.shuffle_fetch_type == ShuffleFetchType.FETCH_BY_KEY
deps.extend([(k, self.reducer_index) for k in inp.op.source_keys])
else:
deps.append(inp.key)
return deps
return super().get_dependent_data_keys()

def _iter_mapper_key_idx_pairs(self, input_id=0, mapper_id=None):
# key is mapper chunk key, index is mapper chunk index.
input_chunk = self.inputs[input_id]
if isinstance(input_chunk.op, ShuffleProxy):
keys = [inp.key for inp in input_chunk.inputs]
Expand All @@ -91,6 +100,13 @@ def _iter_mapper_key_idx_pairs(self, input_id=0, mapper_id=None):
else None
)
else:
assert isinstance(input_chunk.op, FetchShuffle), input_chunk.op
if input_chunk.op.shuffle_fetch_type == ShuffleFetchType.FETCH_BY_INDEX:
# For fetch shuffle by index, all shuffle block of same reducers are
# identified by their index. chunk key and index are not needed any more.
# so just mock index here.
# keep this in sync with ray executor `execute_subtask`.
return ((i, i) for i in range(input_chunk.op.n_mappers))
keys = input_chunk.op.source_keys
idxes = input_chunk.op.source_idxes
mappers = input_chunk.op.source_mappers
Expand Down Expand Up @@ -127,3 +143,11 @@ def iter_mapper_data(
ctx, input_id, mapper_id, pop=pop, skip_none=skip_none
):
yield data

def execute(self, ctx, op):
"""The mapper stage must ensure all mapper blocks are inserted into ctx and no blocks
for some reducers are missing. This is needed by shuffle fetch by index,
which shuffle block are identified by the index instead of data keys.
For operands implementation simplicity, we can sort the `ctx` by key which are (chunk key, reducer index) tuple
and relax the insert order requirements.
"""
6 changes: 3 additions & 3 deletions mars/core/operand/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_shuffle(setup):
chunk_graph = df.groupby(["a"]).apply(lambda x: x).build_graph(tile=True)
[proxy_chunk] = [c for c in chunk_graph if isinstance(c.op, ShuffleProxy)]
successors = chunk_graph.successors(proxy_chunk)
n_reducer = successors[0].op.n_reducers
assert n_reducer == len(successors), (n_reducer, len(successors))
n_reducers = successors[0].op.n_reducers
assert n_reducers == len(successors), (n_reducers, len(successors))
assert len(set(c.op.n_reducers for c in successors)) == 1
assert sorted([c.op.reducer_ordinal for c in successors]) == list(range(n_reducer))
assert sorted([c.op.reducer_ordinal for c in successors]) == list(range(n_reducers))
2 changes: 1 addition & 1 deletion mars/dataframe/base/shift.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def execute(cls, ctx, op):
slc[axis] = slice(out.shape[axis])

result = result.iloc[tuple(slc)]
assert result.shape == out.shape
assert result.shape == out.shape, (result.shape, out.shape)

ctx[out.key] = result

Expand Down
15 changes: 15 additions & 0 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def test_to_cpu_execution(setup_gpu):
pd.testing.assert_series_equal(res, pseries)


@pytest.mark.ray_dag
def test_rechunk_execution(setup):
data = pd.DataFrame(np.random.rand(8, 10))
df = from_pandas_df(pd.DataFrame(data), chunk_size=3)
Expand Down Expand Up @@ -225,6 +226,7 @@ def f(x: int):
pd.testing.assert_index_equal(result, expected)


@pytest.mark.ray_dag
def test_describe_execution(setup):
s_raw = pd.Series(np.random.rand(10))

Expand Down Expand Up @@ -775,6 +777,7 @@ def test_isin_execution(setup):
pd.testing.assert_frame_equal(result, expected)


@pytest.mark.ray_dag
def test_cut_execution(setup):
session = setup

Expand Down Expand Up @@ -894,6 +897,7 @@ def test_cut_execution(setup):
cut(s3, 3).execute()


@pytest.mark.ray_dag
def test_transpose_execution(setup):
raw = pd.DataFrame(
{"a": ["1", "2", "3"], "b": ["5", "-6", "7"], "c": ["1", "2", "3"]}
Expand Down Expand Up @@ -1079,6 +1083,7 @@ def test_to_numeric_execution(setup):
np.testing.assert_array_equal(r.execute().fetch(), pd.to_numeric(l))


@pytest.mark.ray_dag
def test_q_cut_execution(setup):
rs = np.random.RandomState(0)
raw = rs.random(15) * 1000
Expand Down Expand Up @@ -1120,6 +1125,7 @@ def test_q_cut_execution(setup):
pd.testing.assert_series_equal(pd.Series(result), pd.Series(expected))


@pytest.mark.ray_dag
def test_shift_execution(setup):
# test dataframe
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1214,6 +1220,7 @@ def test_shift_execution(setup):
) from e


@pytest.mark.ray_dag
def test_diff_execution(setup):
rs = np.random.RandomState(0)
raw = pd.DataFrame(
Expand Down Expand Up @@ -1251,6 +1258,7 @@ def test_diff_execution(setup):
pd.testing.assert_series_equal(r.execute().fetch(), s1.diff(1))


@pytest.mark.ray_dag
def test_value_counts_execution(setup):
rs = np.random.RandomState(0)
s = pd.Series(rs.randint(5, size=100), name="s")
Expand Down Expand Up @@ -1436,6 +1444,7 @@ def test_astype(setup):
pd.testing.assert_series_equal(expected, result)


@pytest.mark.ray_dag
def test_drop(setup):
# test dataframe drop
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1506,6 +1515,7 @@ def test_melt(setup):
)


@pytest.mark.ray_dag
def test_drop_duplicates(setup):
# test dataframe drop
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1599,6 +1609,7 @@ def test_drop_duplicates(setup):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
def test_duplicated(setup):
# test dataframe drop
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1840,6 +1851,7 @@ def f5(pdf, chunk_index):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
def test_cartesian_chunk_execution(setup):
rs = np.random.RandomState(0)
raw1 = pd.DataFrame({"a": rs.randint(3, size=10), "b": rs.rand(10)})
Expand Down Expand Up @@ -1957,6 +1969,7 @@ def _tile_rebalance(op):
pd.testing.assert_frame_equal(result, raw)


@pytest.mark.ray_dag
def test_stack_execution(setup):
raw = pd.DataFrame(
np.random.rand(10, 3), columns=list("abc"), index=[f"s{i}" for i in range(10)]
Expand Down Expand Up @@ -2101,6 +2114,7 @@ def test_check_monotonic_execution(setup):
assert ser_mixed.is_monotonic_decreasing.execute().fetch() is False


@pytest.mark.ray_dag
def test_pct_change_execution(setup):
# test dataframe
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -2129,6 +2143,7 @@ def test_pct_change_execution(setup):
pd.testing.assert_frame_equal(expected, result)


@pytest.mark.ray_dag
def test_bloom_filter(setup):
ns = np.random.RandomState(0)
raw1 = pd.DataFrame(
Expand Down
16 changes: 16 additions & 0 deletions mars/dataframe/groupby/tests/test_groupby_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def post(self, v1, v2):
return v1 + v2


@pytest.mark.ray_dag
def test_groupby(setup):
rs = np.random.RandomState(0)
data_size = 100
Expand Down Expand Up @@ -116,6 +117,7 @@ def test_groupby(setup):
)


@pytest.mark.ray_dag
def test_groupby_getitem(setup):
rs = np.random.RandomState(0)
data_size = 100
Expand Down Expand Up @@ -324,6 +326,7 @@ def test_groupby_getitem(setup):
)


@pytest.mark.ray_dag
def test_dataframe_groupby_agg(setup):
agg_funs = [
"std",
Expand Down Expand Up @@ -491,6 +494,7 @@ def test_dataframe_groupby_agg(setup):
)


@pytest.mark.ray_dag
def test_dataframe_groupby_agg_sort(setup):
agg_funs = [
"std",
Expand Down Expand Up @@ -585,6 +589,7 @@ def test_dataframe_groupby_agg_sort(setup):
assert r.op.groupby_params["as_index"] is True


@pytest.mark.ray_dag
def test_series_groupby_agg(setup):
rs = np.random.RandomState(0)
series1 = pd.Series(rs.rand(10))
Expand Down Expand Up @@ -660,6 +665,7 @@ def test_series_groupby_agg(setup):
)


@pytest.mark.ray_dag
def test_groupby_agg_auto_method(setup):
rs = np.random.RandomState(0)
raw = pd.DataFrame(
Expand Down Expand Up @@ -772,6 +778,7 @@ def test_distributed_groupby_agg(setup_cluster):
assert len(r._fetch_infos()["memory_size"]) == 3


@pytest.mark.ray_dag
def test_groupby_agg_str_cat(setup):
agg_fun = lambda x: x.str.cat(sep="_", na_rep="NA")

Expand Down Expand Up @@ -842,6 +849,7 @@ def test_gpu_groupby_agg(setup_gpu):
)


@pytest.mark.ray_dag
def test_groupby_apply(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -924,6 +932,7 @@ def apply_series(s, truncate=True):
)


@pytest.mark.ray_dag
def test_groupby_transform(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1000,6 +1009,7 @@ def transform_series(s, truncate=True):
)


@pytest.mark.ray_dag
def test_groupby_cum(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1039,6 +1049,7 @@ def test_groupby_cum(setup):
)


@pytest.mark.ray_dag
def test_groupby_fill(setup):
df1 = pd.DataFrame(
[
Expand Down Expand Up @@ -1100,6 +1111,7 @@ def test_groupby_fill(setup):
)


@pytest.mark.ray_dag
def test_groupby_head(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1184,6 +1196,7 @@ def test_groupby_head(setup):
)


@pytest.mark.ray_dag
def test_groupby_sample(setup):
rs = np.random.RandomState(0)
sample_count = 10
Expand Down Expand Up @@ -1292,6 +1305,7 @@ def test_groupby_sample(setup):
r1.execute().fetch()


@pytest.mark.ray_dag
@pytest.mark.skipif(pa is None, reason="pyarrow not installed")
def test_groupby_agg_with_arrow_dtype(setup):
df1 = pd.DataFrame({"a": [1, 2, 1], "b": ["a", "b", "a"]})
Expand Down Expand Up @@ -1329,6 +1343,7 @@ def test_groupby_agg_with_arrow_dtype(setup):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
@pytest.mark.skipif(pa is None, reason="pyarrow not installed")
def test_groupby_apply_with_arrow_dtype(setup):
df1 = pd.DataFrame({"a": [1, 2, 1], "b": ["a", "b", "a"]})
Expand All @@ -1351,6 +1366,7 @@ def test_groupby_apply_with_arrow_dtype(setup):
pd.testing.assert_series_equal(arrow_array_to_objects(result), expected)


@pytest.mark.ray_dag
def test_groupby_nunique(setup):
rs = np.random.RandomState(0)
data_size = 100
Expand Down

0 comments on commit 31bd6cc

Please sign in to comment.