Skip to content

Commit

Permalink
[Ray] Fix pandas schema parsing when reading Ray dataset (#2946)
Browse files Browse the repository at this point in the history
* fix parsing pandas schema and add ut

* fix pandas schema import

* update ray

* Fix test_task_manager

* skip test_cut_execution for ray_dag

* fix coverage

Co-authored-by: zhongchun <zhongchunyu@gmail.com>
  • Loading branch information
chaokunyang and zhongchun committed Jun 6, 2022
1 parent a464573 commit 78628b6
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 8 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ jobs:
rm -fr /tmp/etcd-$ETCD_VER-linux-amd64.tar.gz /tmp/etcd-download-test
fi
if [ -n "$WITH_RAY" ] || [ -n "$WITH_RAY_DAG" ] || [ -n "$WITH_RAY_DEPLOY" ]; then
pip install ray[default]==1.9.2 "protobuf<4"
pip install "xgboost_ray==0.1.5" "xgboost<1.6.0"
pip install "xgboost_ray==0.1.5" "xgboost<1.6.0" "protobuf<4"
# Use standard ray releases when ownership bug is fixed
pip uninstall -y ray
pip install https://s3-us-west-2.amazonaws.com/ray-wheels/master/c03d0432f3bb40f3c597b7fc450870ba5e34ad56/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
# Ray Datasets need pyarrow>=6.0.1
pip install "pyarrow>=6.0.1"
fi
if [ -n "$RUN_DASK" ]; then
pip install dask[complete] mimesis sklearn
Expand Down
1 change: 0 additions & 1 deletion mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,6 @@ def test_isin_execution(setup):
pd.testing.assert_frame_equal(result, expected)


@pytest.mark.ray_dag
def test_cut_execution(setup):
session = setup

Expand Down
17 changes: 15 additions & 2 deletions mars/dataframe/datasource/read_raydataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,23 @@ def __call__(self, index_value=None, columns_value=None, dtypes=None):
def read_ray_dataset(ds, columns=None, incremental_index=False, **kwargs):
assert isinstance(ds, real_ray_dataset.Dataset)
refs = ds.to_pandas_refs()
dtypes = ds.schema().empty_table().to_pandas().dtypes
schema = ds.schema()

import pyarrow as pa

try:
from ray.data.impl.pandas_block import PandasBlockSchema

if isinstance(schema, PandasBlockSchema):
dtypes = pd.Series(schema.types, index=schema.names)
elif isinstance(schema, pa.Schema):
dtypes = schema.empty_table().to_pandas().dtypes
else:
raise NotImplementedError(f"Unsupported format of schema {schema}")
except ImportError: # pragma: no cover
dtypes = schema.empty_table().to_pandas().dtypes
index_value = parse_index(pd.RangeIndex(-1))
columns_value = parse_index(dtypes.index, store_data=True)

op = DataFrameReadRayDataset(
refs=refs, columns=columns, incremental_index=incremental_index
)
Expand Down
30 changes: 30 additions & 0 deletions mars/dataframe/datasource/tests/test_datasource_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,36 @@ def test_read_raydataset(ray_start_regular, ray_create_mars_cluster):
mdf = md.read_ray_dataset(ds)
assert df.equals(mdf.execute().fetch())

n = 10000
pdf = pd.DataFrame({"a": list(range(n)), "b": list(range(n, 2 * n))})
df = md.DataFrame(pdf)

# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
pd.testing.assert_frame_equal(ds.to_pandas(), df.to_pandas())
ds2 = ds.filter(lambda row: row["a"] % 2 == 0)
assert ds2.take(5) == [{"a": 2 * i, "b": n + 2 * i} for i in range(5)]

# Convert ray dataset to mars dataframe
df2 = md.read_ray_dataset(ds2)
pd.testing.assert_frame_equal(
df2.head(5).to_pandas(),
pd.DataFrame({"a": list(range(0, 10, 2)), "b": list(range(n, n + 10, 2))}),
)

# Test Arrow Dataset
pdf2 = pd.DataFrame({c: range(5) for c in "abc"})
ds3 = ray.data.from_arrow([pa.Table.from_pandas(pdf2) for _ in range(3)])
df3 = md.read_ray_dataset(ds3)
pd.testing.assert_frame_equal(
df3.head(5).to_pandas(),
pdf2,
)

# Test simple datasets
with pytest.raises(NotImplementedError):
ray.data.range(10).to_mars()


@require_ray
def test_read_ray_mldataset(ray_start_regular, ray_create_mars_cluster):
Expand Down
4 changes: 2 additions & 2 deletions mars/services/task/execution/ray/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_deploy_band_resources(self) -> List[Dict[str, Resource]]:
return []

def get_subtask_max_retries(self):
return self._ray_execution_config.get("subtask_max_retries")
return self._ray_execution_config["subtask_max_retries"]

def get_n_cpu(self):
return self._ray_execution_config["n_cpu"]
Expand All @@ -49,7 +49,7 @@ def get_n_worker(self):
return self._ray_execution_config["n_worker"]

def get_subtask_cancel_timeout(self):
return self._ray_execution_config.get("subtask_cancel_timeout")
return self._ray_execution_config["subtask_cancel_timeout"]

def create_task_state_actor_as_needed(self):
# Whether create RayTaskState actor as needed.
Expand Down
8 changes: 7 additions & 1 deletion mars/services/task/supervisor/tests/test_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ async def actor_pool():
await MockMutableAPI.create(session_id, pool.external_address)

# create configuration
config = ExecutionConfig.from_params(backend=backend, n_worker=1, n_cpu=2)
config = ExecutionConfig.from_params(
backend=backend,
n_worker=1,
n_cpu=2,
subtask_max_retries=3,
subtask_cancel_timeout=3,
)
await mo.create_actor(
TaskConfigurationActor,
dict(),
Expand Down

0 comments on commit 78628b6

Please sign in to comment.