Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6970: Implement to/from_ray_dataset functions #6971

Merged
merged 14 commits into from
Mar 5, 2024
14 changes: 14 additions & 0 deletions docs/ecosystem.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,19 @@ where NumPy can be used and what libraries it powers.

numpy_arr = to_numpy(modin_df)

to_ray_dataset
--------------

You can refer to `Ray Data`_ page to get more details on
where Ray Dataset can be used and what libraries it powers.

.. code-block:: python

from modin.pandas.io import to_ray_dataset

ray_dataset = to_ray_dataset(modin_df)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved

.. _pandas ecosystem: https://pandas.pydata.org/community/ecosystem.html
.. _NumPy ecosystem: https://numpy.org
.. _Ray Data: https://docs.ray.io/en/latest/data/data.html

10 changes: 10 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@
def from_dataframe(cls, *args, **kwargs):
return cls.get_factory()._from_dataframe(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._from_ray_dataset)
def from_ray_dataset(cls, ray_obj):
return cls.get_factory()._from_ray_dataset(ray_obj)

Check warning on line 187 in modin/core/execution/dispatching/factories/dispatcher.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dispatching/factories/dispatcher.py#L187

Added line #L187 was not covered by tests

@classmethod
@_inherit_docstrings(factories.BaseFactory._read_parquet)
def read_parquet(cls, **kwargs):
Expand Down Expand Up @@ -350,3 +355,8 @@
@_inherit_docstrings(factories.BaseFactory._to_parquet)
def to_parquet(cls, *args, **kwargs):
return cls.get_factory()._to_parquet(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._to_ray_dataset)
def to_ray_dataset(cls, modin_obj):
return cls.get_factory()._to_ray_dataset(modin_obj)
31 changes: 31 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@
def _from_dataframe(cls, *args, **kwargs):
return cls.io_cls.from_dataframe(*args, **kwargs)

@classmethod
@doc(
_doc_io_method_template,
source="a Ray Dataset",
params="ray_obj : ray.data.Dataset",
method="modin.core.execution.ray.implementations.pandas_on_ray.io.PandasOnRayIO.from_ray_dataset",
)
def _from_ray_dataset(cls, ray_obj):
return cls.io_cls.from_ray_dataset(ray_obj)

Check warning on line 212 in modin/core/execution/dispatching/factories/factories.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dispatching/factories/factories.py#L212

Added line #L212 was not covered by tests

@classmethod
@doc(
_doc_io_method_template,
Expand Down Expand Up @@ -455,6 +465,27 @@
"""
return cls.io_cls.to_parquet(*args, **kwargs)

@classmethod
def _to_ray_dataset(cls, modin_obj):
"""
Write query compiler content to a Ray Dataset.

Parameters
----------
modin_obj : modin.pandas.DataFrame, modin.pandas.Series
The Modin DataFrame/Series to write.

Returns
-------
ray.data.Dataset
A Ray Dataset object.

Notes
-----
Modin DataFrame/Series can only be converted to a Ray Dataset if Modin uses a Ray engine.
"""
return cls.io_cls.to_ray_dataset(modin_obj)

# experimental methods that don't exist in pandas
@classmethod
@doc(
Expand Down
48 changes: 48 additions & 0 deletions modin/core/execution/ray/generic/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,51 @@

class RayIO(BaseIO):
"""Base class for doing I/O operations over Ray."""

@classmethod
def from_ray_dataset(cls, ray_obj):
"""
Create a Modin `query_compiler` from a Ray Dataset.

Parameters
----------
ray_obj : ray.data.Dataset
The Ray Dataset to convert from.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data from the Ray Dataset.

Notes
-----
This function must be implemented in every subclass
otherwise NotImplementedError will be raised.
"""
raise NotImplementedError(
f"Modin dataset can't be created from `ray.data.Dataset` using {cls}."
)

@classmethod
def to_ray_dataset(cls, modin_obj):
"""
Convert a Modin DataFrame/Series to a Ray Dataset.

Parameters
----------
modin_obj : modin.DataFrame, modin.Series
YarShev marked this conversation as resolved.
Show resolved Hide resolved
The Modin DataFrame/Series to convert.

Returns
-------
ray.data.Dataset
Converted object with type depending on input.

Notes
-----
This function must be implemented in every subclass
otherwise NotImplementedError will be raised.
"""
raise NotImplementedError(
f"`ray.data.Dataset` can't be created from Modin DataFrame/Series using {cls}."
)
41 changes: 41 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import pandas
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

from modin.core.execution.ray.common import RayWrapper, SignalActor
from modin.core.execution.ray.generic.io import RayIO
Expand Down Expand Up @@ -258,3 +259,43 @@
RayWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)

@classmethod
def from_ray_dataset(cls, ray_obj):
"""
Create a Modin `query_compiler` from a Ray Dataset.

Parameters
----------
ray_obj : ray.data.Dataset
The Ray Dataset to convert from.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data from the Ray Dataset.
"""
from modin.distributed.dataframe.pandas.partitions import from_partitions

Check warning on line 278 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L278

Added line #L278 was not covered by tests

pd_objs = ray_obj.to_pandas_refs()
return from_partitions(pd_objs, axis=0)._query_compiler

Check warning on line 281 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L280-L281

Added lines #L280 - L281 were not covered by tests

@classmethod
def to_ray_dataset(cls, modin_obj):
"""
Convert a Modin DataFrame/Series to a Ray Dataset.

Parameters
----------
modin_obj : modin.DataFrame, modin.Series
YarShev marked this conversation as resolved.
Show resolved Hide resolved
The Modin DataFrame/Series to convert.

Returns
-------
ray.data.Dataset
Converted object with type depending on input.
"""
from modin.distributed.dataframe.pandas.partitions import unwrap_partitions

Check warning on line 298 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L298

Added line #L298 was not covered by tests

parts = unwrap_partitions(modin_obj, axis=0)
return from_pandas_refs(parts)

Check warning on line 301 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L300-L301

Added lines #L300 - L301 were not covered by tests
48 changes: 48 additions & 0 deletions modin/core/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,30 @@
"""
return cls.query_compiler_cls.from_dataframe(df, cls.frame_cls)

@classmethod
def from_ray_dataset(cls, ray_obj):
"""
Create a Modin `query_compiler` from a Ray Dataset.

Parameters
----------
ray_obj : ray.data.Dataset
The Ray Dataset to convert from.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data from the Ray Dataset.

Notes
-----
Ray Dataset can only be converted to a Modin Dataframe if Modin uses a Ray engine.
If another engine is used, the runtime exception will be raised.
"""
raise RuntimeError(

Check warning on line 139 in modin/core/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/io.py#L139

Added line #L139 was not covered by tests
"Modin Dataframe can only be converted to a Ray Dataset if Modin uses a Ray engine."
)

@classmethod
@_inherit_docstrings(pandas.read_parquet, apilink="pandas.read_parquet")
@doc(
Expand Down Expand Up @@ -694,3 +718,27 @@
obj = obj.to_pandas()

return obj.to_parquet(path, **kwargs)

@classmethod
def to_ray_dataset(cls, modin_obj):
"""
Convert a Modin DataFrame/Series to a Ray Dataset.

Parameters
----------
modin_obj : modin.DataFrame, modin.Series
YarShev marked this conversation as resolved.
Show resolved Hide resolved
The Modin DataFrame/Series to convert.

Returns
-------
ray.data.Dataset
Converted object with type depending on input.

Notes
-----
Modin DataFrame/Series can only be converted to a Ray Dataset if Modin uses a Ray engine.
If another engine is used, the runtime exception will be raised.
"""
raise RuntimeError(
"Modin Dataframe can only be converted to a Ray Dataset if Modin uses a Ray engine."
)
17 changes: 16 additions & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from modin.error_message import ErrorMessage
from modin.logging import disable_logging
from modin.pandas import Categorical
from modin.pandas.io import from_non_pandas, from_pandas, to_pandas
from modin.pandas.io import from_non_pandas, from_pandas, to_pandas, to_ray_dataset
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
from modin.utils import (
MODIN_UNNAMED_SERIES_LABEL,
_inherit_docstrings,
Expand Down Expand Up @@ -2229,6 +2229,21 @@
**kwargs,
)

def to_ray_dataset(self):
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"""
Convert a Modin DataFrame to a Ray Dataset.

Returns
-------
ray.data.Dataset
Converted object with type depending on input.

Notes
-----
Modin Dataframe can only be converted to a Ray Dataset if Modin uses a Ray engine.
"""
return to_ray_dataset(self)

def to_period(
self, freq=None, axis=0, copy=None
): # pragma: no cover # noqa: PR01, RT01, D200
Expand Down
48 changes: 48 additions & 0 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,31 @@
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dataframe(df))


def from_ray_dataset(ray_obj):
"""
Convert a Ray Dataset into Modin DataFrame.

Parameters
----------
ray_obj : ray.data.Dataset
The Ray Dataset to convert from.

Returns
-------
DataFrame
A new Modin DataFrame object.

Notes
-----
Ray Dataset can only be converted to Modin Dataframe if Modin uses a Ray engine.
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Check warning on line 1055 in modin/pandas/io.py

View check run for this annotation

Codecov / codecov/patch

modin/pandas/io.py#L1055

Added line #L1055 was not covered by tests

return ModinObjects.DataFrame(

Check warning on line 1057 in modin/pandas/io.py

View check run for this annotation

Codecov / codecov/patch

modin/pandas/io.py#L1057

Added line #L1057 was not covered by tests
query_compiler=FactoryDispatcher.from_ray_dataset(ray_obj)
)


def to_pandas(modin_obj: SupportsPublicToPandas) -> Any:
"""
Convert a Modin DataFrame/Series to a pandas DataFrame/Series.
Expand Down Expand Up @@ -1075,6 +1100,29 @@
return array


def to_ray_dataset(modin_obj):
"""
Convert a Modin DataFrame/Series to a Ray Dataset.

Parameters
----------
modin_obj : modin.DataFrame, modin.Series
YarShev marked this conversation as resolved.
Show resolved Hide resolved
The DataFrame/Series to convert.

Returns
-------
ray.data.Dataset
Converted object with type depending on input.

Notes
-----
Modin DataFrame/Series can only be converted to a Ray Dataset if Modin uses a Ray engine.
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

return FactoryDispatcher.to_ray_dataset(modin_obj)


__all__ = [
"ExcelFile",
"HDFStore",
Expand Down
17 changes: 16 additions & 1 deletion modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from modin.config import PersistentPickle
from modin.logging import disable_logging
from modin.pandas.io import from_pandas, to_pandas
from modin.pandas.io import from_pandas, to_pandas, to_ray_dataset

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
modin.pandas.io
begins an import cycle.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YarShev
It seems that change still doesn’t work.
It looks like a false positive, CodeQL have the same PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Then we can revert imports back to the top.

from modin.utils import MODIN_UNNAMED_SERIES_LABEL, _inherit_docstrings

from .accessor import CachedAccessor, SparseAccessor
Expand Down Expand Up @@ -1955,6 +1955,21 @@

tolist = to_list

def to_ray_dataset(self):
"""
Convert a Modin Series to a Ray Dataset.

Returns
-------
ray.data.Dataset
Converted object with type depending on input.

Notes
-----
Modin Series can only be converted to a Ray Dataset if Modin uses a Ray engine.
"""
return to_ray_dataset(self)

# TODO(williamma12): When we implement to_timestamp, have this call the version
# in base.py
def to_period(self, freq=None, copy=None): # noqa: PR01, RT01, D200
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def test_dataframe_api_equality():

ignore_in_pandas = ["timetuple"]
# modin - namespace for using experimental functionality
ignore_in_modin = ["modin", "to_pandas"]
ignore_in_modin = ["modin", "to_pandas", "to_ray_dataset"]
missing_from_modin = set(pandas_dir) - set(modin_dir)
assert not len(
missing_from_modin - set(ignore_in_pandas)
Expand All @@ -164,7 +164,7 @@ def test_dataframe_api_equality():
), "Differences found in API: {}".format(set(modin_dir) - set(pandas_dir))

# These have to be checked manually
allowed_different = ["to_hdf", "hist", "modin", "to_pandas"]
allowed_different = ["to_hdf", "hist", "modin", "to_pandas", "to_ray_dataset"]

assert_parameters_eq((pandas.DataFrame, pd.DataFrame), modin_dir, allowed_different)

Expand Down