Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"pyarrow>=16", # remove struct_field_names() and struct_fields() when upgraded to 18+
"Deprecated>=1.2.0",
"wrapt>=1.12.1",
"fsspec>=2025.7.0",

# NOTE: package PINNED at <0.3.0, see https://github.com/astronomy-commons/lsdb/issues/1047
"universal_pathlib>=0.2,<0.3.0",
Expand Down
117 changes: 99 additions & 18 deletions src/nested_pandas/nestedframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from pathlib import Path

import fsspec.parquet
import pandas as pd
import pyarrow as pa
import pyarrow.fs
Expand All @@ -14,7 +15,9 @@
from ..series.utils import table_to_struct_array
from .core import NestedFrame

# Use smaller block size for FSSPEC filesystems, it usually helps with parquet reads
# Use smaller block size for these FSSPEC filesystems.
# It usually helps with parquet read speed.
FSSPEC_FILESYSTEMS = ("http", "https")
FSSPEC_BLOCK_SIZE = 32 * 1024


Expand All @@ -25,19 +28,29 @@ def read_parquet(
autocast_list: bool = False,
**kwargs,
) -> NestedFrame:
"""
Load a parquet object from a file path into a NestedFrame.
"""Load a parquet object from a file path into a NestedFrame.

As a deviation from `pandas`, this function loads via
`pyarrow.parquet.read_table`, and then converts to a NestedFrame.
As a specialization of the ``pandas.read_parquet`` function, this
function loads the data via existing ``pyarrow`` or
``fsspec.parquet`` methods, and then converts the data to a
NestedFrame.

Parameters
----------
data: str, list or str, Path, Upath, or file-like object
Path to the data or a file-like object. If a string is passed, it can be a single file name,
directory name, or a remote path (e.g., HTTP/HTTPS or S3). If a file-like object is passed,
it must support the `read` method. You can also pass the `filesystem` argument with
a `pyarrow.fs` object, which will be passed to `pyarrow.parquet.read_table()`.
Path to the data or a file-like object. If a string is passed,
it can be a single file name, directory name, or a remote path
(e.g., HTTP/HTTPS or S3). If a file-like object is passed, it
must support the ``read`` method. You can also pass a
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which will
be passed along to the underlying file-reading method.
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
If the path is to a single Parquet file, it will be loaded using
``fsspec.parquet.open_parquet_file``, which has optimized handling
for remote Parquet files.
columns : list, default=None
If not None, only these columns will be read from the file.
reject_nesting: list or str, default=None
Expand All @@ -57,6 +70,11 @@ def read_parquet(

Notes
-----
For paths to single Parquet files, this function uses
fsspec.parquet.open_parquet_file, which performs intelligent
precaching. This can significantly improve performance compared
to standard PyArrow reading on remote files.

pyarrow supports partial loading of nested structures from parquet, for
example ```pd.read_parquet("data.parquet", columns=["nested.a"])``` will
load the "a" column of the "nested" column. Standard pandas/pyarrow
Expand Down Expand Up @@ -85,6 +103,7 @@ def read_parquet(

>>> #Load only the "flux" sub-column of the "nested" column
>>> nf = npd.read_parquet("path/to/file.parquet", columns=["a", "nested.flux"]) # doctest: +SKIP

"""

# Type convergence for reject_nesting
Expand All @@ -93,14 +112,41 @@ def read_parquet(
elif isinstance(reject_nesting, str):
reject_nesting = [reject_nesting]

# First load through pyarrow
# If `filesystem` is specified - use it
if kwargs.get("filesystem") is not None:
table = pq.read_table(data, columns=columns, **kwargs)
# Otherwise convert with a special function
# For single Parquet file paths, we want to use
# `fsspec.parquet.open_parquet_file`. But for any other usage
# (which includes file-like objects, directories and lists
# thereof), we want to defer to `pq.read_table`.

# At the end of this block, `table` will contain the data.

# NOTE: the test for _is_local_dir is sufficient, because we're
# preserving a path to pq.read_table, which can read local
# directories, but not remote directories. Remote directories
# cannot be read by either of these methods.
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
storage_options = _get_storage_options(path_to_data)
filesystem = kwargs.get("filesystem")
if not filesystem:
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
with fsspec.parquet.open_parquet_file(
str(path_to_data),
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
table = pq.read_table(parquet_file, columns=columns, **kwargs)
else:
data, filesystem = _transform_read_parquet_data_arg(data)
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
# All other cases, including file-like objects, directories, and
# even lists of the foregoing.

# If `filesystem` is specified - use it, passing it as part of **kwargs
if kwargs.get("filesystem") is not None:
table = pq.read_table(data, columns=columns, **kwargs)
else:
# Otherwise convert with a special function
data, filesystem = _transform_read_parquet_data_arg(data)
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)

# Resolve partial loading of nested structures
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
Expand Down Expand Up @@ -160,6 +206,41 @@ def read_parquet(
return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list)


def _is_local_dir(path_to_data: UPath):
"""Returns True if the given path refers to a local directory.

It's necessary to have this function, rather than simply checking
``UPath(p).is_dir()``, because ``UPath.is_dir`` can be quite
expensive in the case of a remote file path that isn't a directory.
"""
return path_to_data.protocol in ("", "file") and path_to_data.is_dir()


def _get_storage_options(path_to_data: UPath):
"""Get storage options for fsspec.parquet.open_parquet_file.

Parameters
----------
path_to_data : UPath
The data source

Returns
-------
dict
Storage options (or None)
"""
if path_to_data.protocol not in ("", "file"):
# Remote files of all types (s3, http)
storage_options = path_to_data.storage_options or {}
# For some cases, use smaller block size
if path_to_data.protocol in FSSPEC_FILESYSTEMS:
storage_options = {**storage_options, "block_size": FSSPEC_BLOCK_SIZE}
return storage_options

# Local files
return None


def _transform_read_parquet_data_arg(data):
"""Transform `data` argument of read_parquet to pq.read_parquet's `source` and `filesystem`"""
# Check if a list, run the function recursively and check that filesystems are all the same
Expand Down Expand Up @@ -204,8 +285,8 @@ def _transform_read_parquet_data_arg(data):
# If it is a local path, use pyarrow's filesystem
if upath.protocol == "":
return upath.path, None
# If HTTP, change the default UPath object to use a smaller block size
if upath.protocol in ("http", "https"):
# Change the default UPath object to use a smaller block size in some cases
if upath.protocol in FSSPEC_FILESYSTEMS:
upath = UPath(upath, block_size=FSSPEC_BLOCK_SIZE)
return upath.path, upath.fs

Expand Down
62 changes: 61 additions & 1 deletion tests/nested_pandas/nestedframe/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
import pytest
from nested_pandas import NestedFrame, read_parquet
from nested_pandas.datasets import generate_data
from nested_pandas.nestedframe.io import _transform_read_parquet_data_arg, from_pyarrow
from nested_pandas.nestedframe.io import (
FSSPEC_BLOCK_SIZE,
_get_storage_options,
_transform_read_parquet_data_arg,
from_pyarrow,
)
from pandas.testing import assert_frame_equal
from upath import UPath

Expand Down Expand Up @@ -399,3 +404,58 @@ def test__transform_read_parquet_data_arg():
"https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet",
]
)


def test_read_parquet_with_fsspec_optimization():
"""Test that read_parquet automatically uses fsspec optimization for remote files."""
# Test with local file (should not use fsspec optimization)
local_path = "tests/test_data/nested.parquet"

# Test basic reading - local files should work as before
nf1 = read_parquet(local_path)

# Test with additional kwargs
nf2 = read_parquet(local_path, columns=["a", "nested.flux"], use_threads=True)

assert len(nf2) <= len(nf1) # filtered columns
assert "a" in nf2.columns
assert "nested" in nf2.columns


def test_docstring_includes_fsspec_notes():
"""Test that the docstring mentions the automatic fsspec optimization."""
docstring = read_parquet.__doc__
assert "fsspec" in docstring
assert "remote" in docstring.lower()


def test__get_storage_options():
"""Test _get_storage_options function with various input types."""
local_path = "tests/test_data/nested.parquet"

# Test with UPath objects (local files)
local_upath = UPath(local_path)
storage_opts = _get_storage_options(local_upath)
assert storage_opts is None # Local UPath should have no storage options

# Test with UPath objects (HTTP)
http_url = "http://example.com/data.parquet"
http_upath = UPath(http_url)
storage_opts = _get_storage_options(http_upath)
assert storage_opts is not None
assert storage_opts.get("block_size") == FSSPEC_BLOCK_SIZE

# Test with UPath objects (HTTPS)
https_url = "https://example.com/data.parquet"
https_upath = UPath(https_url)
storage_opts = _get_storage_options(https_upath)
assert storage_opts is not None
assert storage_opts.get("block_size") == FSSPEC_BLOCK_SIZE

# Test with UPath objects (S3)
s3_url = "s3://bucket/path/data.parquet"
s3_upath = UPath(s3_url)
storage_opts = _get_storage_options(s3_upath)
assert storage_opts is not None
# S3 should NOT have the block_size override (only HTTP/HTTPS)
assert storage_opts.get("block_size") != FSSPEC_BLOCK_SIZE
23 changes: 23 additions & 0 deletions tests/nested_pandas/nestedframe/test_nestedframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,29 @@ def make_id(row, prefix_str):
get_max, columns=["packed.c", "packed.d"], output_names=["only_one_name"], row_container="args"
)

# Test output_names as a string (single output)
def get_single_max(row):
return row["packed.c"].max()

result = nf.map_rows(get_single_max, columns=["packed.c"], output_names="max_c")
assert len(result) == len(nf)
assert list(result.columns) == ["max_c"]
for i in range(len(result)):
assert result["max_c"].values[i] == expected_max_c[i]

# Test output_names as a list (multiple outputs)
def get_max_pair(row):
return pd.Series([row["packed.c"].max(), row["packed.d"].max()], index=["max_col1", "max_col2"])

result = nf.map_rows(
get_max_pair, columns=["packed.c", "packed.d"], output_names=["custom_max1", "custom_max2"]
)
assert len(result) == len(nf)
assert list(result.columns) == ["custom_max1", "custom_max2"]
for i in range(len(result)):
assert result["custom_max1"].values[i] == expected_max_c[i]
assert result["custom_max2"].values[i] == expected_max_d[i]

# Verify that append_columns=True works as expected.
# Ensure that even with non-unique indexes, the final result retains
# the original index (nested-pandas#301)
Expand Down