Skip to content

Commit

Permalink
[Ray] Enable CI of mars/dataframe for Ray DAG (#3250)
Browse files Browse the repository at this point in the history
* Enable CI in mars/dataframe for Ray DAG

* Fix

* Fix

* Fix

* Fix

* Pin pandas<1.4 for Ray CI

* Fix

Co-authored-by: 刘宝 <po.lb@antgroup.com>
  • Loading branch information
fyrestone and 刘宝 committed Sep 15, 2022
1 parent 0b20242 commit e7fd974
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 55 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ jobs:
pip install https://s3-us-west-2.amazonaws.com/ray-wheels/master/c03d0432f3bb40f3c597b7fc450870ba5e34ad56/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
# Ray Datasets need pyarrow>=6.0.1
pip install "pyarrow>=6.0.1"
# Pandas 1.4 compatibility issue, maybe pandas 1.4 bugs
# https://github.com/mars-project/mars/issues/3251
# https://github.com/mars-project/mars/issues/3252
pip install "pandas<1.4"
fi
if [ -n "$RUN_DASK" ]; then
pip install dask[complete] mimesis sklearn
Expand Down Expand Up @@ -173,6 +177,8 @@ jobs:
fi
if [ -n "$WITH_RAY_DAG" ]; then
export MARS_CI_BACKEND=ray
export RAY_idle_worker_killing_time_threshold_ms=60000
pytest $PYTEST_CONFIG --durations=0 --timeout=500 mars/dataframe -v -s -m "not skip_ray_dag"
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s -m ray_dag
mv .coverage build/.coverage.ray_dag.file
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag.py
Expand Down
2 changes: 2 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def _ray_start_regular(request): # pragma: no cover
param = getattr(request, "param", {})
if not param.get("enable", True):
yield
elif ray and ray.is_initialized():
yield
else:
num_cpus = param.get("num_cpus", 64)
total_memory_mb = num_cpus * 2 * 1024**2
Expand Down
17 changes: 0 additions & 17 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def test_to_cpu_execution(setup_gpu):
pd.testing.assert_series_equal(res, pseries)


@pytest.mark.ray_dag
def test_rechunk_execution(setup):
data = pd.DataFrame(np.random.rand(8, 10))
df = from_pandas_df(pd.DataFrame(data), chunk_size=3)
Expand Down Expand Up @@ -226,7 +225,6 @@ def f(x: int):
pd.testing.assert_index_equal(result, expected)


@pytest.mark.ray_dag
def test_describe_execution(setup):
s_raw = pd.Series(np.random.rand(10))

Expand Down Expand Up @@ -718,7 +716,6 @@ def test_datetime_method_execution(setup):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
def test_isin_execution(setup):
# one chunk in multiple chunks
a = pd.Series([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
Expand Down Expand Up @@ -811,7 +808,6 @@ def test_isin_execution(setup):
pd.testing.assert_frame_equal(result, expected)


@pytest.mark.ray_dag
def test_cut_execution(setup):
session = setup

Expand Down Expand Up @@ -931,7 +927,6 @@ def test_cut_execution(setup):
cut(s3, 3).execute()


@pytest.mark.ray_dag
def test_transpose_execution(setup):
raw = pd.DataFrame(
{"a": ["1", "2", "3"], "b": ["5", "-6", "7"], "c": ["1", "2", "3"]}
Expand Down Expand Up @@ -1117,7 +1112,6 @@ def test_to_numeric_execution(setup):
np.testing.assert_array_equal(r.execute().fetch(), pd.to_numeric(l))


@pytest.mark.ray_dag
def test_q_cut_execution(setup):
rs = np.random.RandomState(0)
raw = rs.random(15) * 1000
Expand Down Expand Up @@ -1159,7 +1153,6 @@ def test_q_cut_execution(setup):
pd.testing.assert_series_equal(pd.Series(result), pd.Series(expected))


@pytest.mark.ray_dag
def test_shift_execution(setup):
# test dataframe
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1254,7 +1247,6 @@ def test_shift_execution(setup):
) from e


@pytest.mark.ray_dag
def test_diff_execution(setup):
rs = np.random.RandomState(0)
raw = pd.DataFrame(
Expand Down Expand Up @@ -1292,7 +1284,6 @@ def test_diff_execution(setup):
pd.testing.assert_series_equal(r.execute().fetch(), s1.diff(1))


@pytest.mark.ray_dag
def test_value_counts_execution(setup):
rs = np.random.RandomState(0)
s = pd.Series(rs.randint(5, size=100), name="s")
Expand Down Expand Up @@ -1478,7 +1469,6 @@ def test_astype(setup):
pd.testing.assert_series_equal(expected, result)


@pytest.mark.ray_dag
def test_drop(setup):
# test dataframe drop
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1549,7 +1539,6 @@ def test_melt(setup):
)


@pytest.mark.ray_dag
def test_drop_duplicates(setup):
# test dataframe drop
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1643,7 +1632,6 @@ def test_drop_duplicates(setup):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
def test_duplicated(setup):
# test dataframe drop
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -1885,7 +1873,6 @@ def f5(pdf, chunk_index):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
def test_cartesian_chunk_execution(setup):
rs = np.random.RandomState(0)
raw1 = pd.DataFrame({"a": rs.randint(3, size=10), "b": rs.rand(10)})
Expand Down Expand Up @@ -1973,7 +1960,6 @@ def f4(c1, c2):
)


@pytest.mark.ray_dag
def test_rebalance_execution(setup):
raw = pd.DataFrame(np.random.rand(10, 3), columns=list("abc"))
df = from_pandas_df(raw)
Expand Down Expand Up @@ -2003,7 +1989,6 @@ def _tile_rebalance(op):
pd.testing.assert_frame_equal(result, raw)


@pytest.mark.ray_dag
def test_stack_execution(setup):
raw = pd.DataFrame(
np.random.rand(10, 3), columns=list("abc"), index=[f"s{i}" for i in range(10)]
Expand Down Expand Up @@ -2148,7 +2133,6 @@ def test_check_monotonic_execution(setup):
assert ser_mixed.is_monotonic_decreasing.execute().fetch() is False


@pytest.mark.ray_dag
def test_pct_change_execution(setup):
# test dataframe
rs = np.random.RandomState(0)
Expand Down Expand Up @@ -2177,7 +2161,6 @@ def test_pct_change_execution(setup):
pd.testing.assert_frame_equal(expected, result)


@pytest.mark.ray_dag
def test_bloom_filter(setup):
rs = np.random.RandomState(0)
raw1 = pd.DataFrame(
Expand Down
2 changes: 2 additions & 0 deletions mars/dataframe/contrib/raydataset/tests/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def test_dataset_related_classes(ray_start_regular_shared):
@require_ray
@pytest.mark.asyncio
@pytest.mark.parametrize("test_option", [[5, 5], [5, 4], [None, None]])
@pytest.mark.skip_ray_dag # mldataset is not compatible with Ray DAG
async def test_convert_to_ray_mldataset(
ray_start_regular_shared, create_cluster, test_option
):
Expand All @@ -91,6 +92,7 @@ async def test_convert_to_ray_mldataset(
@require_ray
@pytest.mark.asyncio
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # mldataset is not compatible with Ray DAG
async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, train, predict
from sklearn.datasets import load_breast_cancer
Expand Down
4 changes: 4 additions & 0 deletions mars/dataframe/contrib/raydataset/tests/test_raydataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def create_cluster(request):
@require_ray
@pytest.mark.asyncio
@pytest.mark.parametrize("test_option", [[3, 3], [3, 2], [None, None]])
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_convert_to_ray_dataset(
ray_start_regular_shared, create_cluster, test_option
):
Expand All @@ -74,6 +75,7 @@ async def test_convert_to_ray_dataset(
@require_ray
@pytest.mark.asyncio
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer
Expand Down Expand Up @@ -116,6 +118,7 @@ async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
@pytest.mark.asyncio
@pytest.mark.skipif(sklearn is None, reason="sklearn not installed")
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_mars_with_xgboost_sklearn_clf(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, RayXGBClassifier
from sklearn.datasets import load_breast_cancer
Expand Down Expand Up @@ -158,6 +161,7 @@ async def test_mars_with_xgboost_sklearn_clf(ray_start_regular_shared, create_cl
@pytest.mark.asyncio
@pytest.mark.skipif(sklearn is None, reason="sklearn not installed")
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, RayXGBRegressor
from sklearn.datasets import make_regression
Expand Down
2 changes: 2 additions & 0 deletions mars/dataframe/datasource/tests/test_datasource_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ def test_read_parquet_fast_parquet(setup):


@require_ray
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
def test_read_raydataset(ray_start_regular, ray_create_mars_cluster):
test_df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1210,6 +1211,7 @@ def test_read_raydataset(ray_start_regular, ray_create_mars_cluster):


@require_ray
@pytest.mark.skip_ray_dag # mldataset is not compatible with Ray DAG
def test_read_ray_mldataset(ray_start_regular, ray_create_mars_cluster):
test_dfs = [
pd.DataFrame(
Expand Down
17 changes: 1 addition & 16 deletions mars/dataframe/groupby/tests/test_groupby_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def post(self, v1, v2):
return v1 + v2


@pytest.mark.ray_dag
def test_groupby(setup):
rs = np.random.RandomState(0)
data_size = 100
Expand Down Expand Up @@ -117,7 +116,6 @@ def test_groupby(setup):
)


@pytest.mark.ray_dag
def test_groupby_getitem(setup):
rs = np.random.RandomState(0)
data_size = 100
Expand Down Expand Up @@ -326,7 +324,6 @@ def test_groupby_getitem(setup):
)


@pytest.mark.ray_dag
def test_dataframe_groupby_agg(setup):
agg_funs = [
"std",
Expand Down Expand Up @@ -494,7 +491,6 @@ def test_dataframe_groupby_agg(setup):
)


@pytest.mark.ray_dag
def test_dataframe_groupby_agg_sort(setup):
agg_funs = [
"std",
Expand Down Expand Up @@ -589,7 +585,6 @@ def test_dataframe_groupby_agg_sort(setup):
assert r.op.groupby_params["as_index"] is True


@pytest.mark.ray_dag
def test_series_groupby_agg(setup):
rs = np.random.RandomState(0)
series1 = pd.Series(rs.rand(10))
Expand Down Expand Up @@ -665,7 +660,6 @@ def test_series_groupby_agg(setup):
)


@pytest.mark.ray_dag
def test_groupby_agg_auto_method(setup):
rs = np.random.RandomState(0)
raw = pd.DataFrame(
Expand Down Expand Up @@ -735,6 +729,7 @@ def _disallow_combine_and_agg(ctx, op):
pd.testing.assert_frame_equal(result.sort_index(), raw.groupby("c1").agg("sum"))


@pytest.mark.skip_ray_dag # _fetch_infos() is not supported by ray backend.
def test_distributed_groupby_agg(setup_cluster):
rs = np.random.RandomState(0)
raw = pd.DataFrame(rs.rand(50000, 10))
Expand Down Expand Up @@ -778,7 +773,6 @@ def test_distributed_groupby_agg(setup_cluster):
assert len(r._fetch_infos()["memory_size"]) == 3


@pytest.mark.ray_dag
def test_groupby_agg_str_cat(setup):
agg_fun = lambda x: x.str.cat(sep="_", na_rep="NA")

Expand Down Expand Up @@ -849,7 +843,6 @@ def test_gpu_groupby_agg(setup_gpu):
)


@pytest.mark.ray_dag
def test_groupby_apply(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -969,7 +962,6 @@ def apply_closure_series(s):
)


@pytest.mark.ray_dag
def test_groupby_transform(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1046,7 +1038,6 @@ def transform_series(s, truncate=True):
)


@pytest.mark.ray_dag
def test_groupby_cum(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1086,7 +1077,6 @@ def test_groupby_cum(setup):
)


@pytest.mark.ray_dag
def test_groupby_fill(setup):
df1 = pd.DataFrame(
[
Expand Down Expand Up @@ -1148,7 +1138,6 @@ def test_groupby_fill(setup):
)


@pytest.mark.ray_dag
def test_groupby_head(setup):
df1 = pd.DataFrame(
{
Expand Down Expand Up @@ -1233,7 +1222,6 @@ def test_groupby_head(setup):
)


@pytest.mark.ray_dag
def test_groupby_sample(setup):
rs = np.random.RandomState(0)
sample_count = 10
Expand Down Expand Up @@ -1342,7 +1330,6 @@ def test_groupby_sample(setup):
r1.execute().fetch()


@pytest.mark.ray_dag
@pytest.mark.skipif(pa is None, reason="pyarrow not installed")
def test_groupby_agg_with_arrow_dtype(setup):
df1 = pd.DataFrame({"a": [1, 2, 1], "b": ["a", "b", "a"]})
Expand Down Expand Up @@ -1380,7 +1367,6 @@ def test_groupby_agg_with_arrow_dtype(setup):
pd.testing.assert_series_equal(result, expected)


@pytest.mark.ray_dag
@pytest.mark.skipif(pa is None, reason="pyarrow not installed")
def test_groupby_apply_with_arrow_dtype(setup):
df1 = pd.DataFrame({"a": [1, 2, 1], "b": ["a", "b", "a"]})
Expand All @@ -1403,7 +1389,6 @@ def test_groupby_apply_with_arrow_dtype(setup):
pd.testing.assert_series_equal(arrow_array_to_objects(result), expected)


@pytest.mark.ray_dag
def test_groupby_nunique(setup):
rs = np.random.RandomState(0)
data_size = 100
Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/indexing/tests/test_indexing_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ def _execute_data_source(ctx, op): # pragma: no cover
return _execute_data_source


@pytest.mark.skip_ray_dag # operand_executors is not supported by ray backend.
@pytest.mark.pd_compat
def test_optimization(setup):
import sqlalchemy as sa
Expand Down Expand Up @@ -1656,7 +1657,6 @@ def test_add_prefix_suffix(setup):
pd.testing.assert_series_equal(r.execute().fetch(), raw.add_suffix("_item"))


@pytest.mark.ray_dag
@pytest.mark.parametrize("join", ["outer", "left"])
def test_align_execution(setup, join):
rs = np.random.RandomState(0)
Expand Down

0 comments on commit e7fd974

Please sign in to comment.