Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,27 @@ def time_run(self):
def peakmem_run(self):
"""Benchmark the memory usage of read_parquet(self.path, columns=self.columns)"""
self.run()


class ReadFewColumnsHTTPSWithOptimization:
"""Benchmark read_parquet("https://", columns=[...])

Note: fsspec optimization is now automatic for remote URLs,
so this benchmark is equivalent to ReadFewColumnsHTTPS.
Kept for historical comparison purposes.
"""

path = "https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet"
columns = ["_healpix_29", "ra", "astrometric_primary_flag"]

def run(self):
"""Run the benchmark (fsspec optimization is automatic for remote URLs)."""
_ = read_parquet(self.path, columns=self.columns)

def time_run(self):
"""Benchmark the runtime with automatic fsspec optimization"""
self.run()

def peakmem_run(self):
"""Benchmark the memory usage with automatic fsspec optimization"""
self.run()
98 changes: 97 additions & 1 deletion src/nested_pandas/nestedframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def read_parquet(

Notes
-----
For remote storage (S3, GCS, HTTP/HTTPS), this function automatically uses
fsspec.parquet.open_parquet_file for optimized access with intelligent
precaching, which can significantly improve performance compared to standard
PyArrow reading.

pyarrow supports partial loading of nested structures from parquet, for
example ```pd.read_parquet("data.parquet", columns=["nested.a"])``` will
load the "a" column of the "nested" column. Standard pandas/pyarrow
Expand Down Expand Up @@ -94,8 +99,11 @@ def read_parquet(
reject_nesting = [reject_nesting]

# First load through pyarrow
# If data is remote, use fsspec.parquet for better performance
if _should_use_fsspec_optimization(data, kwargs.get("filesystem")):
table = _read_with_fsspec_optimization(data, columns, kwargs)
# If `filesystem` is specified - use it
if kwargs.get("filesystem") is not None:
elif kwargs.get("filesystem") is not None:
table = pq.read_table(data, columns=columns, **kwargs)
# Otherwise convert with a special function
else:
Expand Down Expand Up @@ -291,3 +299,91 @@ def _cast_list_cols_to_nested(df):
if pa.types.is_list(dtype.pyarrow_dtype):
df[col] = pack_lists(df[[col]])
return df


def _should_use_fsspec_optimization(data, explicit_filesystem):
"""Determine if fsspec optimization should be used.

Parameters
----------
data : str, Path, UPath, or file-like object
The data source
explicit_filesystem : filesystem or None
Explicitly provided filesystem

Returns
-------
bool
True if fsspec optimization should be used for this data source
"""
# Don't use optimization if explicit filesystem is provided
if explicit_filesystem is not None:
return False

# Don't use for file-like objects
if hasattr(data, "read"):
return False

# For UPath objects, check if they're remote (check before Path since UPath inherits from Path)
if isinstance(data, UPath):
return data.protocol not in ("", "file")

# Don't use for Path objects (local files)
if isinstance(data, Path):
return False

# For strings, check if they look like remote URLs
if isinstance(data, str):
return data.startswith(("http://", "https://", "s3://", "gs://", "gcs://", "azure://", "adl://"))

return False


def _read_with_fsspec_optimization(data, columns, kwargs):
"""Read parquet using fsspec optimization for better remote storage performance.

Parameters
----------
data : str, UPath, or path-like
Path to the parquet file
columns : list or None
Columns to read
kwargs : dict
Additional kwargs for reading

Returns
-------
pyarrow.Table
The loaded table
"""
try:
import fsspec.parquet
except ImportError:
# Fall back to regular method if fsspec.parquet not available
data_converted, filesystem = _transform_read_parquet_data_arg(data)
return pq.read_table(data_converted, filesystem=filesystem, columns=columns, **kwargs)

# Convert UPath to string if needed
if isinstance(data, UPath):
path_str = str(data)
# Use UPath storage options for fsspec
storage_options = data.storage_options if data.storage_options else None
else:
path_str = str(data)
storage_options = None

# Use fsspec.parquet.open_parquet_file for optimized access
try:
with fsspec.parquet.open_parquet_file(
path_str,
columns=columns,
storage_options=storage_options,
engine="pyarrow"
) as parquet_file:
# Read the table using PyArrow with the optimized file handle
table = pq.read_table(parquet_file, columns=columns, **kwargs)
return table
except Exception:
# Fall back to regular method if optimization fails
data_converted, filesystem = _transform_read_parquet_data_arg(data)
return pq.read_table(data_converted, filesystem=filesystem, columns=columns, **kwargs)
51 changes: 51 additions & 0 deletions tests/nested_pandas/nestedframe/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,54 @@ def test__transform_read_parquet_data_arg():
"https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet",
]
)


def test_read_parquet_with_fsspec_optimization():
"""Test that read_parquet automatically uses fsspec optimization for remote files."""
# Test with local file (should not use fsspec optimization)
local_path = "tests/test_data/nested.parquet"

# Test basic reading - local files should work as before
nf1 = read_parquet(local_path)

# Test with additional kwargs
nf2 = read_parquet(
local_path,
columns=["a", "nested.flux"],
use_threads=True
)

assert len(nf2) <= len(nf1) # filtered columns
assert "a" in nf2.columns
assert "nested" in nf2.columns


def test_fsspec_optimization_path_detection():
"""Test _should_use_fsspec_optimization correctly identifies remote paths."""
from nested_pandas.nestedframe.io import _should_use_fsspec_optimization
from pathlib import Path

# Test cases that should NOT use optimization
assert not _should_use_fsspec_optimization("local_file.parquet", None)
assert not _should_use_fsspec_optimization("/path/to/file.parquet", None)
assert not _should_use_fsspec_optimization(Path("local_file.parquet"), None)
assert not _should_use_fsspec_optimization(UPath("local_file.parquet"), None)
assert not _should_use_fsspec_optimization("https://example.com/file.parquet", "some_filesystem")

# Test file-like object
import io
assert not _should_use_fsspec_optimization(io.BytesIO(b"test"), None)

# Test cases that SHOULD use optimization
assert _should_use_fsspec_optimization("https://example.com/file.parquet", None)
assert _should_use_fsspec_optimization("s3://bucket/file.parquet", None)
assert _should_use_fsspec_optimization("gs://bucket/file.parquet", None)
assert _should_use_fsspec_optimization(UPath("https://example.com/file.parquet"), None)
assert _should_use_fsspec_optimization(UPath("s3://bucket/file.parquet"), None)


def test_docstring_includes_fsspec_notes():
"""Test that the docstring mentions the automatic fsspec optimization."""
docstring = read_parquet.__doc__
assert "fsspec" in docstring
assert "remote" in docstring.lower()