Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6831: Implement read_parquet_glob and to_parquet_glob #6854

Merged
merged 7 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/flow/modin/experimental/pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ Experimental API Reference
.. autofunction:: read_csv_glob
.. autofunction:: read_custom_text
.. autofunction:: read_pickle_distributed
.. autofunction:: read_parquet_glob
.. automethod:: modin.pandas.DataFrame.modin::to_pickle_distributed
.. automethod:: modin.pandas.DataFrame.modin::to_parquet_glob
2 changes: 2 additions & 0 deletions docs/supported_apis/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ default to pandas.
| | | | ``path`` parameter specifies a directory where one |
| | | | file is written per row partition of the Modin |
| | | | dataframe. |
| | | | Experimental implementation: |
| | | | DataFrame.modin.to_parquet_glob |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``to_period`` | `to_period`_ | D | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
Expand Down
1 change: 1 addition & 0 deletions docs/supported_apis/io_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ default to pandas.
| | | passed via ``**kwargs`` are not supported. |
| | | ``use_nullable_dtypes`` == True is not supported. |
| | | |
| | | Experimental implementation: read_parquet_glob |
+-------------------+---------------------------------+--------------------------------------------------------+
| `read_json`_ | P | Implemented for ``lines=True`` |
+-------------------+---------------------------------+--------------------------------------------------------+
Expand Down
6 changes: 4 additions & 2 deletions docs/usage_guide/advanced_usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ Modin also supports these experimental APIs on top of pandas that are under acti
- :py:func:`~modin.experimental.pandas.read_csv_glob` -- read multiple files in a directory
- :py:func:`~modin.experimental.pandas.read_sql` -- add optional parameters for the database connection
- :py:func:`~modin.experimental.pandas.read_custom_text` -- read custom text data from file
- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple files in a directory
- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple files in a directory
- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple pickle files in a directory
- :py:func:`~modin.experimental.pandas.read_parquet_glob` -- read multiple parquet files in a directory
- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple pickle files in a directory
- :py:meth:`~modin.pandas.DataFrame.modin.to_parquet_glob` -- write to multiple parquet files in a directory

DataFrame partitioning API
--------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -89,10 +90,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": BaseIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change read_pickle_distributed and to_pickle_distributed to read_pickle_glob and to_pickle_glob (separate issue)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like it for consistency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File an issue for that please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": BaseIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
10 changes: 10 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ def to_pickle(cls, *args, **kwargs):
def to_pickle_distributed(cls, *args, **kwargs):
return cls.get_factory()._to_pickle_distributed(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob)
def read_parquet_glob(cls, *args, **kwargs):
return cls.get_factory()._read_parquet_glob(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob)
def to_parquet_glob(cls, *args, **kwargs):
return cls.get_factory()._to_parquet_glob(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._read_custom_text)
def read_custom_text(cls, **kwargs):
Expand Down
33 changes: 33 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,39 @@ def _to_pickle_distributed(cls, *args, **kwargs):
)
return cls.io_cls.to_pickle_distributed(*args, **kwargs)

@classmethod
@doc(
_doc_io_method_raw_template,
source="Parquet files",
params=_doc_io_method_kwargs_params,
)
def _read_parquet_glob(cls, **kwargs):
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
f"`_read_parquet_glob()` is not implemented for {current_execution} execution."
)
return cls.io_cls.read_parquet_glob(**kwargs)

@classmethod
def _to_parquet_glob(cls, *args, **kwargs):
"""
Write query compiler content to several parquet files.

Parameters
----------
*args : args
Arguments to pass to the writer method.
**kwargs : kwargs
Arguments to pass to the writer method.
"""
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
f"`_to_parquet_glob()` is not implemented for {current_execution} execution."
)
return cls.io_cls.to_parquet_glob(*args, **kwargs)


@doc(_doc_factory_class, execution_name="PandasOnRay")
class PandasOnRayFactory(BaseFactory):
Expand Down
17 changes: 14 additions & 3 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": RayIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": RayIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": UnidistIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": UnidistIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
4 changes: 2 additions & 2 deletions modin/core/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def to_csv(cls, obj, **kwargs): # noqa: PR01
@_inherit_docstrings(
pandas.DataFrame.to_parquet, apilink="pandas.DataFrame.to_parquet"
)
def to_parquet(cls, obj, **kwargs): # noqa: PR01
def to_parquet(cls, obj, path, **kwargs): # noqa: PR01
YarShev marked this conversation as resolved.
Show resolved Hide resolved
"""
Write object to the binary parquet format using pandas.

Expand All @@ -662,4 +662,4 @@ def to_parquet(cls, obj, **kwargs): # noqa: PR01
if isinstance(obj, BaseQueryCompiler):
obj = obj.to_pandas()

return obj.to_parquet(**kwargs)
return obj.to_parquet(path, **kwargs)
5 changes: 3 additions & 2 deletions modin/experimental/core/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

"""Experimental IO functions implementations."""

from .pickle.pickle_dispatcher import ExperimentalPickleDispatcher
from .glob.glob_dispatcher import ExperimentalGlobDispatcher
from .sql.sql_dispatcher import ExperimentalSQLDispatcher
from .text.csv_glob_dispatcher import ExperimentalCSVGlobDispatcher
from .text.custom_text_dispatcher import ExperimentalCustomTextDispatcher

__all__ = [
"ExperimentalCSVGlobDispatcher",
"ExperimentalSQLDispatcher",
"ExperimentalPickleDispatcher",
"ExperimentalGlobDispatcher",
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
"ExperimentalGlobDispatcher",
"ExperimentalCustomTextDispatcher",
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Experimental Pickle format type IO functions implementations."""
"""Experimental Glob format type IO functions implementations."""
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Module houses ``ExperimentalPickleDispatcher`` class that is used for reading `.pkl` files."""
"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read files of different formats in parallel."""
anmyachev marked this conversation as resolved.
Show resolved Hide resolved

import glob
import warnings
Expand All @@ -24,20 +24,20 @@
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler


class ExperimentalPickleDispatcher(FileDispatcher):
"""Class handles utils for reading pickle files."""
class ExperimentalGlobDispatcher(FileDispatcher):
"""Class implements reading different formats, parallelizing by the number of files."""
anmyachev marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def _read(cls, filepath_or_buffer, **kwargs):
def _read(cls, **kwargs):
"""
Read data from `filepath_or_buffer` according to `kwargs` parameters.

Parameters
----------
filepath_or_buffer : str, path object or file-like object
`filepath_or_buffer` parameter of `read_pickle` function.
`filepath_or_buffer` parameter of `read_*` function.
**kwargs : dict
Parameters of `read_pickle` function.
Parameters of `read_*` function.

Returns
-------
Expand All @@ -46,10 +46,10 @@ def _read(cls, filepath_or_buffer, **kwargs):

Notes
-----
In experimental mode, we can use `*` in the filename.

The number of partitions is equal to the number of input files.
"""
path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path"
filepath_or_buffer = kwargs.pop(path_key)
filepath_or_buffer = stringify_path(filepath_or_buffer)
if not (isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer):
return cls.single_worker_read(
Expand Down Expand Up @@ -104,37 +104,33 @@ def write(cls, qc, **kwargs):
- if `*` is in the filename, then it will be replaced by the ascending sequence 0, 1, 2, …
- if `*` is not in the filename, then the default implementation will be used.

Example: 4 partitions and input filename="partition*.pkl.gz", then filenames will be:
`partition0.pkl.gz`, `partition1.pkl.gz`, `partition2.pkl.gz`, `partition3.pkl.gz`.

Parameters
----------
qc : BaseQueryCompiler
The query compiler of the Modin dataframe that we want
to run ``to_pickle_distributed`` on.
to run ``to_<format>_glob`` on.
**kwargs : dict
Parameters for ``pandas.to_pickle(**kwargs)``.
Parameters for ``pandas.to_<format>(**kwargs)``.
"""
kwargs["filepath_or_buffer"] = stringify_path(kwargs["filepath_or_buffer"])
path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path"
filepath_or_buffer = kwargs.pop(path_key)
filepath_or_buffer = stringify_path(filepath_or_buffer)
if not (
isinstance(kwargs["filepath_or_buffer"], str)
and "*" in kwargs["filepath_or_buffer"]
isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer
) or not isinstance(qc, PandasQueryCompiler):
warnings.warn("Defaulting to Modin core implementation")
cls.base_io.to_pickle(qc, **kwargs)
cls.base_write(qc, filepath_or_buffer, **kwargs)
return

# Be careful, this is a kind of limitation, but at the time of the first implementation,
# getting a name in this way is quite convenient.
# We can use this attribute because the names of the BaseIO's methods match pandas API.
write_func_name = cls.base_write.__name__
YarShev marked this conversation as resolved.
Show resolved Hide resolved

def func(df, **kw): # pragma: no cover
idx = str(kw["partition_idx"])
# dask doesn't make a copy of kwargs on serialization;
YarShev marked this conversation as resolved.
Show resolved Hide resolved
# so take a copy ourselves, otherwise the error is:
# kwargs["path"] = kwargs.pop("filepath_or_buffer").replace("*", idx)
# KeyError: 'filepath_or_buffer'
dask_kwargs = dict(kwargs)
dask_kwargs["path"] = dask_kwargs.pop("filepath_or_buffer").replace(
"*", idx
)
df.to_pickle(**dask_kwargs)
path = filepath_or_buffer.replace("*", idx)
getattr(df, write_func_name)(path, **kwargs)
return pandas.DataFrame()

result = qc._modin_frame.apply_full_axis(
Expand Down
18 changes: 18 additions & 0 deletions modin/experimental/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ def parse(fname, **kwargs):
return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="parquet files")
class ExperimentalPandasParquetParser(PandasParser):
@staticmethod
@doc(_doc_parse_func, parameters=_doc_parse_parameters_common)
def parse(fname, **kwargs):
warnings.filterwarnings("ignore")
num_splits = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each file is equal to one partition.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably change this in a separate PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an opportunity for further optimization, so if necessary, yes. However it's more important to add support for different formats for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File an issue for further optimization please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single_worker_read = kwargs.pop("single_worker_read", None)
df = pandas.read_parquet(fname, **kwargs)
if single_worker_read:
return df

length = len(df)
width = len(df.columns)

return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="custom text")
class ExperimentalCustomTextParser(PandasParser):
@staticmethod
Expand Down
1 change: 1 addition & 0 deletions modin/experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .io import ( # noqa F401
read_csv_glob,
read_custom_text,
read_parquet_glob,
read_pickle_distributed,
read_sql,
to_pickle_distributed,
Expand Down