Skip to content

Commit

Permalink
FEAT-#6831: Implement read_parquet_glob and to_parquet_glob (#6854)
Browse files Browse the repository at this point in the history
Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev and YarShev committed Jan 13, 2024
1 parent 45ad9de commit f6b31d6
Show file tree
Hide file tree
Showing 19 changed files with 297 additions and 47 deletions.
2 changes: 2 additions & 0 deletions docs/flow/modin/experimental/pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ Experimental API Reference
.. autofunction:: read_csv_glob
.. autofunction:: read_custom_text
.. autofunction:: read_pickle_distributed
.. autofunction:: read_parquet_glob
.. automethod:: modin.pandas.DataFrame.modin::to_pickle_distributed
.. automethod:: modin.pandas.DataFrame.modin::to_parquet_glob
2 changes: 2 additions & 0 deletions docs/supported_apis/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ default to pandas.
| | | | ``path`` parameter specifies a directory where one |
| | | | file is written per row partition of the Modin |
| | | | dataframe. |
| | | | Experimental implementation: |
| | | | DataFrame.modin.to_parquet_glob |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``to_period`` | `to_period`_ | D | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
Expand Down
1 change: 1 addition & 0 deletions docs/supported_apis/io_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ default to pandas.
| | | passed via ``**kwargs`` are not supported. |
| | | ``use_nullable_dtypes`` == True is not supported. |
| | | |
| | | Experimental implementation: read_parquet_glob |
+-------------------+---------------------------------+--------------------------------------------------------+
| `read_json`_ | P | Implemented for ``lines=True`` |
+-------------------+---------------------------------+--------------------------------------------------------+
Expand Down
6 changes: 4 additions & 2 deletions docs/usage_guide/advanced_usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ Modin also supports these experimental APIs on top of pandas that are under acti
- :py:func:`~modin.experimental.pandas.read_csv_glob` -- read multiple files in a directory
- :py:func:`~modin.experimental.pandas.read_sql` -- add optional parameters for the database connection
- :py:func:`~modin.experimental.pandas.read_custom_text` -- read custom text data from file
- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple files in a directory
- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple files in a directory
- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple pickle files in a directory
- :py:func:`~modin.experimental.pandas.read_parquet_glob` -- read multiple parquet files in a directory
- :py:meth:`~modin.pandas.DataFrame.modin.to_pickle_distributed` -- write to multiple pickle files in a directory
- :py:meth:`~modin.pandas.DataFrame.modin.to_parquet_glob` -- write to multiple parquet files in a directory

DataFrame partitioning API
--------------------------
Expand Down
17 changes: 14 additions & 3 deletions modin/core/execution/dask/implementations/pandas_on_dask/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -89,10 +90,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": BaseIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": BaseIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
10 changes: 10 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ def to_pickle(cls, *args, **kwargs):
def to_pickle_distributed(cls, *args, **kwargs):
return cls.get_factory()._to_pickle_distributed(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._read_parquet_glob)
def read_parquet_glob(cls, *args, **kwargs):
return cls.get_factory()._read_parquet_glob(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._to_parquet_glob)
def to_parquet_glob(cls, *args, **kwargs):
return cls.get_factory()._to_parquet_glob(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.PandasOnRayFactory._read_custom_text)
def read_custom_text(cls, **kwargs):
Expand Down
33 changes: 33 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,39 @@ def _to_pickle_distributed(cls, *args, **kwargs):
)
return cls.io_cls.to_pickle_distributed(*args, **kwargs)

@classmethod
@doc(
_doc_io_method_raw_template,
source="Parquet files",
params=_doc_io_method_kwargs_params,
)
def _read_parquet_glob(cls, **kwargs):
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
f"`_read_parquet_glob()` is not implemented for {current_execution} execution."
)
return cls.io_cls.read_parquet_glob(**kwargs)

@classmethod
def _to_parquet_glob(cls, *args, **kwargs):
"""
Write query compiler content to several parquet files.
Parameters
----------
*args : args
Arguments to pass to the writer method.
**kwargs : kwargs
Arguments to pass to the writer method.
"""
current_execution = get_current_execution()
if current_execution not in supported_executions:
raise NotImplementedError(
f"`_to_parquet_glob()` is not implemented for {current_execution} execution."
)
return cls.io_cls.to_parquet_glob(*args, **kwargs)


@doc(_doc_factory_class, execution_name="PandasOnRay")
class PandasOnRayFactory(BaseFactory):
Expand Down
17 changes: 14 additions & 3 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": RayIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": RayIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
from modin.experimental.core.io import (
ExperimentalCSVGlobDispatcher,
ExperimentalCustomTextDispatcher,
ExperimentalPickleDispatcher,
ExperimentalGlobDispatcher,
ExperimentalSQLDispatcher,
)
from modin.experimental.core.storage_formats.pandas.parsers import (
ExperimentalCustomTextParser,
ExperimentalPandasCSVGlobParser,
ExperimentalPandasParquetParser,
ExperimentalPandasPickleParser,
)

Expand Down Expand Up @@ -91,10 +92,20 @@ def __make_write(*classes, build_args=build_args):
read_csv_glob = __make_read(
ExperimentalPandasCSVGlobParser, ExperimentalCSVGlobDispatcher
)
read_parquet_glob = __make_read(
ExperimentalPandasParquetParser, ExperimentalGlobDispatcher
)
to_parquet_glob = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": UnidistIO.to_parquet},
)
read_pickle_distributed = __make_read(
ExperimentalPandasPickleParser, ExperimentalPickleDispatcher
ExperimentalPandasPickleParser, ExperimentalGlobDispatcher
)
to_pickle_distributed = __make_write(
ExperimentalGlobDispatcher,
build_args={**build_args, "base_write": UnidistIO.to_pickle},
)
to_pickle_distributed = __make_write(ExperimentalPickleDispatcher)
read_custom_text = __make_read(
ExperimentalCustomTextParser, ExperimentalCustomTextDispatcher
)
Expand Down
4 changes: 2 additions & 2 deletions modin/core/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ def to_csv(cls, obj, **kwargs): # noqa: PR01
@_inherit_docstrings(
pandas.DataFrame.to_parquet, apilink="pandas.DataFrame.to_parquet"
)
def to_parquet(cls, obj, **kwargs): # noqa: PR01
def to_parquet(cls, obj, path, **kwargs): # noqa: PR01
"""
Write object to the binary parquet format using pandas.
Expand All @@ -661,4 +661,4 @@ def to_parquet(cls, obj, **kwargs): # noqa: PR01
if isinstance(obj, BaseQueryCompiler):
obj = obj.to_pandas()

return obj.to_parquet(**kwargs)
return obj.to_parquet(path, **kwargs)
4 changes: 2 additions & 2 deletions modin/experimental/core/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

"""Experimental IO functions implementations."""

from .pickle.pickle_dispatcher import ExperimentalPickleDispatcher
from .glob.glob_dispatcher import ExperimentalGlobDispatcher
from .sql.sql_dispatcher import ExperimentalSQLDispatcher
from .text.csv_glob_dispatcher import ExperimentalCSVGlobDispatcher
from .text.custom_text_dispatcher import ExperimentalCustomTextDispatcher

__all__ = [
"ExperimentalCSVGlobDispatcher",
"ExperimentalSQLDispatcher",
"ExperimentalPickleDispatcher",
"ExperimentalGlobDispatcher",
"ExperimentalCustomTextDispatcher",
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Experimental Pickle format type IO functions implementations."""
"""Experimental module that allows to work with various formats using glob syntax."""
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Module houses ``ExperimentalPickleDispatcher`` class that is used for reading `.pkl` files."""
"""Module houses ``ExperimentalGlobDispatcher`` class that is used to read/write files of different formats in parallel."""

import glob
import warnings
Expand All @@ -24,20 +24,20 @@
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler


class ExperimentalPickleDispatcher(FileDispatcher):
"""Class handles utils for reading pickle files."""
class ExperimentalGlobDispatcher(FileDispatcher):
"""Class implements reading/writing different formats, parallelizing by the number of files."""

@classmethod
def _read(cls, filepath_or_buffer, **kwargs):
def _read(cls, **kwargs):
"""
Read data from `filepath_or_buffer` according to `kwargs` parameters.
Parameters
----------
filepath_or_buffer : str, path object or file-like object
`filepath_or_buffer` parameter of `read_pickle` function.
`filepath_or_buffer` parameter of `read_*` function.
**kwargs : dict
Parameters of `read_pickle` function.
Parameters of `read_*` function.
Returns
-------
Expand All @@ -46,10 +46,10 @@ def _read(cls, filepath_or_buffer, **kwargs):
Notes
-----
In experimental mode, we can use `*` in the filename.
The number of partitions is equal to the number of input files.
"""
path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path"
filepath_or_buffer = kwargs.pop(path_key)
filepath_or_buffer = stringify_path(filepath_or_buffer)
if not (isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer):
return cls.single_worker_read(
Expand Down Expand Up @@ -104,37 +104,33 @@ def write(cls, qc, **kwargs):
- if `*` is in the filename, then it will be replaced by the ascending sequence 0, 1, 2, …
- if `*` is not in the filename, then the default implementation will be used.
Example: 4 partitions and input filename="partition*.pkl.gz", then filenames will be:
`partition0.pkl.gz`, `partition1.pkl.gz`, `partition2.pkl.gz`, `partition3.pkl.gz`.
Parameters
----------
qc : BaseQueryCompiler
The query compiler of the Modin dataframe that we want
to run ``to_pickle_distributed`` on.
to run ``to_<format>_glob`` on.
**kwargs : dict
Parameters for ``pandas.to_pickle(**kwargs)``.
Parameters for ``pandas.to_<format>(**kwargs)``.
"""
kwargs["filepath_or_buffer"] = stringify_path(kwargs["filepath_or_buffer"])
path_key = "filepath_or_buffer" if "filepath_or_buffer" in kwargs else "path"
filepath_or_buffer = kwargs.pop(path_key)
filepath_or_buffer = stringify_path(filepath_or_buffer)
if not (
isinstance(kwargs["filepath_or_buffer"], str)
and "*" in kwargs["filepath_or_buffer"]
isinstance(filepath_or_buffer, str) and "*" in filepath_or_buffer
) or not isinstance(qc, PandasQueryCompiler):
warnings.warn("Defaulting to Modin core implementation")
cls.base_io.to_pickle(qc, **kwargs)
cls.base_write(qc, filepath_or_buffer, **kwargs)
return

# Be careful, this is a kind of limitation, but at the time of the first implementation,
# getting a name in this way is quite convenient.
# We can use this attribute because the names of the BaseIO's methods match pandas API.
write_func_name = cls.base_write.__name__

def func(df, **kw): # pragma: no cover
idx = str(kw["partition_idx"])
# dask doesn't make a copy of kwargs on serialization;
# so take a copy ourselves, otherwise the error is:
# kwargs["path"] = kwargs.pop("filepath_or_buffer").replace("*", idx)
# KeyError: 'filepath_or_buffer'
dask_kwargs = dict(kwargs)
dask_kwargs["path"] = dask_kwargs.pop("filepath_or_buffer").replace(
"*", idx
)
df.to_pickle(**dask_kwargs)
path = filepath_or_buffer.replace("*", idx)
getattr(df, write_func_name)(path, **kwargs)
return pandas.DataFrame()

result = qc._modin_frame.apply_full_axis(
Expand Down
18 changes: 18 additions & 0 deletions modin/experimental/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ def parse(fname, **kwargs):
return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="parquet files")
class ExperimentalPandasParquetParser(PandasParser):
@staticmethod
@doc(_doc_parse_func, parameters=_doc_parse_parameters_common)
def parse(fname, **kwargs):
warnings.filterwarnings("ignore")
num_splits = 1
single_worker_read = kwargs.pop("single_worker_read", None)
df = pandas.read_parquet(fname, **kwargs)
if single_worker_read:
return df

length = len(df)
width = len(df.columns)

return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="custom text")
class ExperimentalCustomTextParser(PandasParser):
@staticmethod
Expand Down
1 change: 1 addition & 0 deletions modin/experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .io import ( # noqa F401
read_csv_glob,
read_custom_text,
read_parquet_glob,
read_pickle_distributed,
read_sql,
to_pickle_distributed,
Expand Down

0 comments on commit f6b31d6

Please sign in to comment.