Skip to content

Commit

Permalink
Remove skip_ray_dag mark for raydataset tests (#3255)
Browse files Browse the repository at this point in the history
* Fix converting to MLDataset

* Temporarily fix pandas

* Fix tests

* Temporarily fix CI

* Fix checks
  • Loading branch information
vcfgv committed Oct 14, 2022
1 parent 98b5ac2 commit 27d8d34
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ jobs:
if [ -n "$WITH_RAY_DAG" ]; then
export MARS_CI_BACKEND=ray
export RAY_idle_worker_killing_time_threshold_ms=60000
pytest $PYTEST_CONFIG --durations=0 --timeout=500 mars/dataframe -v -s -m "not skip_ray_dag"
pytest $PYTEST_CONFIG --durations=0 --timeout=500 mars/dataframe -v -s -m "not skip_ray_dag" --ignore=mars/dataframe/contrib/raydataset
pytest $PYTEST_CONFIG --durations=0 --timeout=500 mars/dataframe/contrib/raydataset -v -s -m "not skip_ray_dag"
pytest $PYTEST_CONFIG --durations=0 --timeout=500 mars/tensor -v -s -m "not skip_ray_dag"
pytest $PYTEST_CONFIG --durations=0 --timeout=500 mars/learn --ignore mars/learn/contrib --ignore mars/learn/utils/tests/test_collect_ports.py -v -s -m "not skip_ray_dag"
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s -m ray_dag
Expand Down
11 changes: 6 additions & 5 deletions mars/dataframe/contrib/raydataset/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ def to_ray_mldataset(df, num_shards: int = None):
# chunk2 & chunk3 for addr2,
# chunk4 for addr1
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["bands", "object_refs"])
chunk_addr_refs: List[Tuple[Tuple, "ray.ObjectRef"]] = [
(bands[0], object_refs[0])
for bands, object_refs in zip(
fetched_infos["bands"], fetched_infos["object_refs"]
chunk_addr_refs: List[Tuple[Tuple, "ray.ObjectRef"]] = []
for bands, object_refs in zip(fetched_infos["bands"], fetched_infos["object_refs"]):
chunk_addr_ref = (
(bands[0], object_refs[0]) if bands else ("ray_dag_0", object_refs[0])
)
]
chunk_addr_refs.append(chunk_addr_ref)

group_to_obj_refs: Dict[str, List[ray.ObjectRef]] = _group_chunk_refs(
chunk_addr_refs, num_shards
)
Expand Down
15 changes: 7 additions & 8 deletions mars/dataframe/contrib/raydataset/tests/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import pytest

from ..... import dataframe as md
from .....conftest import MARS_CI_BACKEND
from .....deploy.oscar.ray import new_cluster
from .....deploy.oscar.session import new_session
from .....tests.core import require_ray
from .....utils import lazy_import
from ....contrib import raydataset as mdd


ray = lazy_import("ray")
ml_dataset = lazy_import("ray.util.data", rename="ml_dataset")

Expand All @@ -45,6 +45,7 @@ async def create_cluster(request):
worker_num=2,
worker_cpu=1,
worker_mem=256 * 1024**2,
backend=MARS_CI_BACKEND,
)
async with client:
yield client
Expand Down Expand Up @@ -72,16 +73,15 @@ async def test_dataset_related_classes(ray_start_regular_shared):

@require_ray
@pytest.mark.asyncio
@pytest.mark.parametrize("test_option", [[5, 5], [5, 4], [None, None]])
@pytest.mark.skip_ray_dag # mldataset is not compatible with Ray DAG
@pytest.mark.parametrize("chunk_size_and_num_shards", [[5, 5], [5, 4], [None, None]])
async def test_convert_to_ray_mldataset(
ray_start_regular_shared, create_cluster, test_option
ray_start_regular_shared, create_cluster, chunk_size_and_num_shards
):
assert create_cluster.session
session = new_session(address=create_cluster.address, default=True)
session = new_session(address=create_cluster.address, backend="ray")
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
chunk_size, num_shards = chunk_size_and_num_shards
df: md.DataFrame = md.DataFrame(value, chunk_size=chunk_size)
df.execute()

Expand All @@ -92,13 +92,12 @@ async def test_convert_to_ray_mldataset(
@require_ray
@pytest.mark.asyncio
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # mldataset is not compatible with Ray DAG
async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, train, predict
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, default=True)
session = new_session(address=create_cluster.address, backend="ray")
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
df: md.DataFrame = md.concat(
Expand Down
18 changes: 8 additions & 10 deletions mars/dataframe/contrib/raydataset/tests/test_raydataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest

from ..... import dataframe as md
from .....conftest import MARS_CI_BACKEND
from .....deploy.oscar.ray import new_cluster
from .....deploy.oscar.session import new_session
from .....tests.core import require_ray
Expand All @@ -43,23 +44,23 @@ async def create_cluster(request):
worker_num=2,
worker_cpu=1,
worker_mem=256 * 1024**2,
backend=MARS_CI_BACKEND,
)
async with client:
yield client


@require_ray
@pytest.mark.asyncio
@pytest.mark.parametrize("test_option", [[3, 3], [3, 2], [None, None]])
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
@pytest.mark.parametrize("chunk_size_and_num_shards", [[3, 3], [3, 2], [None, None]])
async def test_convert_to_ray_dataset(
ray_start_regular_shared, create_cluster, test_option
ray_start_regular_shared, create_cluster, chunk_size_and_num_shards
):
assert create_cluster.session
session = new_session(address=create_cluster.address, default=True)
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
chunk_size, num_shards = chunk_size_and_num_shards
# ray dataset needs str columns
df: md.DataFrame = md.DataFrame(
value,
Expand All @@ -75,13 +76,12 @@ async def test_convert_to_ray_dataset(
@require_ray
@pytest.mark.asyncio
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, default=True)
session = new_session(address=create_cluster.address, backend="ray")
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
pd_df = pd.concat([train_x, train_y], axis=1)
Expand Down Expand Up @@ -118,13 +118,12 @@ async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
@pytest.mark.asyncio
@pytest.mark.skipif(sklearn is None, reason="sklearn not installed")
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_mars_with_xgboost_sklearn_clf(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, RayXGBClassifier
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, default=True)
session = new_session(address=create_cluster.address, backend="ray")
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
df: md.DataFrame = md.concat(
Expand Down Expand Up @@ -161,13 +160,12 @@ async def test_mars_with_xgboost_sklearn_clf(ray_start_regular_shared, create_cl
@pytest.mark.asyncio
@pytest.mark.skipif(sklearn is None, reason="sklearn not installed")
@pytest.mark.skipif(xgboost_ray is None, reason="xgboost_ray not installed")
@pytest.mark.skip_ray_dag # raydataset is not compatible with Ray DAG
async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cluster):
from xgboost_ray import RayDMatrix, RayParams, RayXGBRegressor
from sklearn.datasets import make_regression

assert create_cluster.session
session = new_session(address=create_cluster.address, default=True)
session = new_session(address=create_cluster.address, backend="ray")
with session:
np_X, np_y = make_regression(n_samples=1_0000, n_features=10)
columns = [f"c{i}" for i in range(np_X.shape[1])]
Expand Down

0 comments on commit 27d8d34

Please sign in to comment.