diff --git a/pyproject.toml b/pyproject.toml index c77bf625..a83c1d41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "pyarrow>=16", # remove struct_field_names() and struct_fields() when upgraded to 18+ "Deprecated>=1.2.0", "wrapt>=1.12.1", + "fsspec>=2025.7.0", # NOTE: package PINNED at <0.3.0, see https://github.com/astronomy-commons/lsdb/issues/1047 "universal_pathlib>=0.2,<0.3.0", diff --git a/src/nested_pandas/nestedframe/io.py b/src/nested_pandas/nestedframe/io.py index 23343101..18da7a0a 100644 --- a/src/nested_pandas/nestedframe/io.py +++ b/src/nested_pandas/nestedframe/io.py @@ -3,6 +3,7 @@ from pathlib import Path +import fsspec.parquet import pandas as pd import pyarrow as pa import pyarrow.fs @@ -14,7 +15,9 @@ from ..series.utils import table_to_struct_array from .core import NestedFrame -# Use smaller block size for FSSPEC filesystems, it usually helps with parquet reads +# Use smaller block size for these FSSPEC filesystems. +# It usually helps with parquet read speed. +FSSPEC_FILESYSTEMS = ("http", "https") FSSPEC_BLOCK_SIZE = 32 * 1024 @@ -25,19 +28,29 @@ def read_parquet( autocast_list: bool = False, **kwargs, ) -> NestedFrame: - """ - Load a parquet object from a file path into a NestedFrame. + """Load a parquet object from a file path into a NestedFrame. - As a deviation from `pandas`, this function loads via - `pyarrow.parquet.read_table`, and then converts to a NestedFrame. + As a specialization of the ``pandas.read_parquet`` function, this + function loads the data via existing ``pyarrow`` or + ``fsspec.parquet`` methods, and then converts the data to a + NestedFrame. Parameters ---------- data: str, list or str, Path, Upath, or file-like object - Path to the data or a file-like object. If a string is passed, it can be a single file name, - directory name, or a remote path (e.g., HTTP/HTTPS or S3). If a file-like object is passed, - it must support the `read` method. You can also pass the `filesystem` argument with - a `pyarrow.fs` object, which will be passed to `pyarrow.parquet.read_table()`. + Path to the data or a file-like object. If a string is passed, + it can be a single file name, directory name, or a remote path + (e.g., HTTP/HTTPS or S3). If a file-like object is passed, it + must support the ``read`` method. You can also pass a + ``filesystem`` keyword argument with a ``pyarrow.fs`` object, which will + be passed along to the underlying file-reading method. + A file URL can also be a path to a directory that contains multiple + partitioned parquet files. Both pyarrow and fastparquet support + paths to directories as well as file URLs. A directory path could be: + ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``. + If the path is to a single Parquet file, it will be loaded using + ``fsspec.parquet.open_parquet_file``, which has optimized handling + for remote Parquet files. columns : list, default=None If not None, only these columns will be read from the file. reject_nesting: list or str, default=None @@ -57,6 +70,11 @@ def read_parquet( Notes ----- + For paths to single Parquet files, this function uses + fsspec.parquet.open_parquet_file, which performs intelligent + precaching. This can significantly improve performance compared + to standard PyArrow reading on remote files. + pyarrow supports partial loading of nested structures from parquet, for example ```pd.read_parquet("data.parquet", columns=["nested.a"])``` will load the "a" column of the "nested" column. Standard pandas/pyarrow @@ -85,6 +103,7 @@ def read_parquet( >>> #Load only the "flux" sub-column of the "nested" column >>> nf = npd.read_parquet("path/to/file.parquet", columns=["a", "nested.flux"]) # doctest: +SKIP + """ # Type convergence for reject_nesting @@ -93,14 +112,41 @@ def read_parquet( elif isinstance(reject_nesting, str): reject_nesting = [reject_nesting] - # First load through pyarrow - # If `filesystem` is specified - use it - if kwargs.get("filesystem") is not None: - table = pq.read_table(data, columns=columns, **kwargs) - # Otherwise convert with a special function + # For single Parquet file paths, we want to use + # `fsspec.parquet.open_parquet_file`. But for any other usage + # (which includes file-like objects, directories and lists + # thereof), we want to defer to `pq.read_table`. + + # At the end of this block, `table` will contain the data. + + # NOTE: the test for _is_local_dir is sufficient, because we're + # preserving a path to pq.read_table, which can read local + # directories, but not remote directories. Remote directories + # cannot be read by either of these methods. + if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)): + storage_options = _get_storage_options(path_to_data) + filesystem = kwargs.get("filesystem") + if not filesystem: + _, filesystem = _transform_read_parquet_data_arg(path_to_data) + with fsspec.parquet.open_parquet_file( + str(path_to_data), + columns=columns, + storage_options=storage_options, + fs=filesystem, + engine="pyarrow", + ) as parquet_file: + table = pq.read_table(parquet_file, columns=columns, **kwargs) else: - data, filesystem = _transform_read_parquet_data_arg(data) - table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs) + # All other cases, including file-like objects, directories, and + # even lists of the foregoing. + + # If `filesystem` is specified - use it, passing it as part of **kwargs + if kwargs.get("filesystem") is not None: + table = pq.read_table(data, columns=columns, **kwargs) + else: + # Otherwise convert with a special function + data, filesystem = _transform_read_parquet_data_arg(data) + table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs) # Resolve partial loading of nested structures # Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux") @@ -160,6 +206,41 @@ def read_parquet( return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list) +def _is_local_dir(path_to_data: UPath): + """Returns True if the given path refers to a local directory. + + It's necessary to have this function, rather than simply checking + ``UPath(p).is_dir()``, because ``UPath.is_dir`` can be quite + expensive in the case of a remote file path that isn't a directory. + """ + return path_to_data.protocol in ("", "file") and path_to_data.is_dir() + + +def _get_storage_options(path_to_data: UPath): + """Get storage options for fsspec.parquet.open_parquet_file. + + Parameters + ---------- + path_to_data : UPath + The data source + + Returns + ------- + dict + Storage options (or None) + """ + if path_to_data.protocol not in ("", "file"): + # Remote files of all types (s3, http) + storage_options = path_to_data.storage_options or {} + # For some cases, use smaller block size + if path_to_data.protocol in FSSPEC_FILESYSTEMS: + storage_options = {**storage_options, "block_size": FSSPEC_BLOCK_SIZE} + return storage_options + + # Local files + return None + + def _transform_read_parquet_data_arg(data): """Transform `data` argument of read_parquet to pq.read_parquet's `source` and `filesystem`""" # Check if a list, run the function recursively and check that filesystems are all the same @@ -204,8 +285,8 @@ def _transform_read_parquet_data_arg(data): # If it is a local path, use pyarrow's filesystem if upath.protocol == "": return upath.path, None - # If HTTP, change the default UPath object to use a smaller block size - if upath.protocol in ("http", "https"): + # Change the default UPath object to use a smaller block size in some cases + if upath.protocol in FSSPEC_FILESYSTEMS: upath = UPath(upath, block_size=FSSPEC_BLOCK_SIZE) return upath.path, upath.fs diff --git a/tests/nested_pandas/nestedframe/test_io.py b/tests/nested_pandas/nestedframe/test_io.py index c4e0195f..f11064aa 100644 --- a/tests/nested_pandas/nestedframe/test_io.py +++ b/tests/nested_pandas/nestedframe/test_io.py @@ -12,7 +12,12 @@ import pytest from nested_pandas import NestedFrame, read_parquet from nested_pandas.datasets import generate_data -from nested_pandas.nestedframe.io import _transform_read_parquet_data_arg, from_pyarrow +from nested_pandas.nestedframe.io import ( + FSSPEC_BLOCK_SIZE, + _get_storage_options, + _transform_read_parquet_data_arg, + from_pyarrow, +) from pandas.testing import assert_frame_equal from upath import UPath @@ -399,3 +404,58 @@ def test__transform_read_parquet_data_arg(): "https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet", ] ) + + +def test_read_parquet_with_fsspec_optimization(): + """Test that read_parquet automatically uses fsspec optimization for remote files.""" + # Test with local file (should not use fsspec optimization) + local_path = "tests/test_data/nested.parquet" + + # Test basic reading - local files should work as before + nf1 = read_parquet(local_path) + + # Test with additional kwargs + nf2 = read_parquet(local_path, columns=["a", "nested.flux"], use_threads=True) + + assert len(nf2) <= len(nf1) # filtered columns + assert "a" in nf2.columns + assert "nested" in nf2.columns + + +def test_docstring_includes_fsspec_notes(): + """Test that the docstring mentions the automatic fsspec optimization.""" + docstring = read_parquet.__doc__ + assert "fsspec" in docstring + assert "remote" in docstring.lower() + + +def test__get_storage_options(): + """Test _get_storage_options function with various input types.""" + local_path = "tests/test_data/nested.parquet" + + # Test with UPath objects (local files) + local_upath = UPath(local_path) + storage_opts = _get_storage_options(local_upath) + assert storage_opts is None # Local UPath should have no storage options + + # Test with UPath objects (HTTP) + http_url = "http://example.com/data.parquet" + http_upath = UPath(http_url) + storage_opts = _get_storage_options(http_upath) + assert storage_opts is not None + assert storage_opts.get("block_size") == FSSPEC_BLOCK_SIZE + + # Test with UPath objects (HTTPS) + https_url = "https://example.com/data.parquet" + https_upath = UPath(https_url) + storage_opts = _get_storage_options(https_upath) + assert storage_opts is not None + assert storage_opts.get("block_size") == FSSPEC_BLOCK_SIZE + + # Test with UPath objects (S3) + s3_url = "s3://bucket/path/data.parquet" + s3_upath = UPath(s3_url) + storage_opts = _get_storage_options(s3_upath) + assert storage_opts is not None + # S3 should NOT have the block_size override (only HTTP/HTTPS) + assert storage_opts.get("block_size") != FSSPEC_BLOCK_SIZE diff --git a/tests/nested_pandas/nestedframe/test_nestedframe.py b/tests/nested_pandas/nestedframe/test_nestedframe.py index 8092665f..b132ef0d 100644 --- a/tests/nested_pandas/nestedframe/test_nestedframe.py +++ b/tests/nested_pandas/nestedframe/test_nestedframe.py @@ -1177,6 +1177,29 @@ def make_id(row, prefix_str): get_max, columns=["packed.c", "packed.d"], output_names=["only_one_name"], row_container="args" ) + # Test output_names as a string (single output) + def get_single_max(row): + return row["packed.c"].max() + + result = nf.map_rows(get_single_max, columns=["packed.c"], output_names="max_c") + assert len(result) == len(nf) + assert list(result.columns) == ["max_c"] + for i in range(len(result)): + assert result["max_c"].values[i] == expected_max_c[i] + + # Test output_names as a list (multiple outputs) + def get_max_pair(row): + return pd.Series([row["packed.c"].max(), row["packed.d"].max()], index=["max_col1", "max_col2"]) + + result = nf.map_rows( + get_max_pair, columns=["packed.c", "packed.d"], output_names=["custom_max1", "custom_max2"] + ) + assert len(result) == len(nf) + assert list(result.columns) == ["custom_max1", "custom_max2"] + for i in range(len(result)): + assert result["custom_max1"].values[i] == expected_max_c[i] + assert result["custom_max2"].values[i] == expected_max_d[i] + # Verify that append_columns=True works as expected. # Ensure that even with non-unique indexes, the final result retains # the original index (nested-pandas#301)