Skip to content

Commit

Permalink
python: fix and test globbing (#3675)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 12, 2022
1 parent 20e4675 commit 73e99b9
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 25 deletions.
2 changes: 1 addition & 1 deletion py-polars/polars/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .expr import Expr, expr_to_lit_or_expr, selection_to_pyexpr_list, wrap_expr
from .frame import DataFrame, LazyFrame, wrap_df, wrap_ldf
from .functions import concat, date_range # DataFrame.describe() & DataFrame.upsample()
from .io import _prepare_file_arg, read_ipc_schema, read_parquet_schema
from .io import _is_local_file, _prepare_file_arg, read_ipc_schema, read_parquet_schema
from .lazy_functions import all, argsort_by, col, concat_list, element, lit, select
from .series import Series, wrap_s
from .whenthen import when # used in expr.clip()
10 changes: 6 additions & 4 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ def _read_parquet(
"""
if isinstance(file, (str, Path)):
file = format_path(file)
if isinstance(file, str) and "*" in file:

if isinstance(file, str) and "*" in file and pli._is_local_file(file):
from polars import scan_parquet

scan = scan_parquet(
Expand Down Expand Up @@ -730,7 +731,8 @@ def _read_ipc(

if isinstance(file, (str, Path)):
file = format_path(file)
if isinstance(file, str) and "*" in file:

if isinstance(file, str) and "*" in file and pli._is_local_file(file):
from polars import scan_ipc

scan = scan_ipc(
Expand All @@ -741,9 +743,9 @@ def _read_ipc(
row_count_offset=row_count_offset,
)
if columns is None:
scan.collect()
return scan.collect()
elif is_str_sequence(columns, False):
scan.select(columns).collect()
return scan.select(columns).collect()
else:
raise ValueError(
"cannot use glob patterns and integer based projection as `columns` argument; Use columns: List[str]"
Expand Down
9 changes: 9 additions & 0 deletions py-polars/polars/internals/io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import glob
from contextlib import contextmanager
from io import BytesIO, StringIO
from pathlib import Path
Expand Down Expand Up @@ -145,3 +146,11 @@ def read_parquet_schema(
file = format_path(file)

return _parquet_schema(file)


def _is_local_file(file: str) -> bool:
try:
next(glob.iglob(file, recursive=True))
return True
except StopIteration:
return False
24 changes: 23 additions & 1 deletion py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,21 @@ def scan_parquet(
rechunk: bool = True,
row_count_name: Optional[str] = None,
row_count_offset: int = 0,
storage_options: Optional[Dict] = None,
) -> LDF:
"""
See Also: `pl.scan_parquet`
"""

# try fsspec scanner
if not pli._is_local_file(file):
scan = pli._scan_parquet_fsspec(file, storage_options)
if n_rows:
scan = scan.head(n_rows)
if row_count_name is not None:
scan = scan.with_row_count(row_count_name, row_count_offset)
return scan # type: ignore

self = cls.__new__(cls)
self._ldf = PyLazyFrame.new_from_parquet(
file,
Expand All @@ -204,16 +214,28 @@ def scan_parquet(
@classmethod
def scan_ipc(
cls: Type[LDF],
file: str,
file: Union[str, Path],
n_rows: Optional[int] = None,
cache: bool = True,
rechunk: bool = True,
row_count_name: Optional[str] = None,
row_count_offset: int = 0,
storage_options: Optional[Dict] = None,
) -> LDF:
"""
See Also: `pl.scan_ipc`
"""
if isinstance(file, (str, Path)):
file = format_path(file)

# try fsspec scanner
if not pli._is_local_file(file):
scan = pli._scan_ipc_fsspec(file, storage_options)
if n_rows:
scan = scan.head(n_rows)
if row_count_name is not None:
scan = scan.with_row_count(row_count_name, row_count_offset)
return scan # type: ignore

self = cls.__new__(cls)
self._ldf = PyLazyFrame.new_from_ipc(
Expand Down
22 changes: 3 additions & 19 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os.path
from io import BytesIO, IOBase, StringIO
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -30,13 +29,7 @@

from polars.convert import from_arrow
from polars.datatypes import DataType
from polars.internals import (
DataFrame,
LazyFrame,
_scan_ds,
_scan_ipc_fsspec,
_scan_parquet_fsspec,
)
from polars.internals import DataFrame, LazyFrame, _scan_ds
from polars.internals.io import _prepare_file_arg

try:
Expand Down Expand Up @@ -573,20 +566,14 @@ def scan_ipc(
# Map legacy arguments to current ones and remove them from kwargs.
n_rows = kwargs.pop("stop_after_n_rows", n_rows)

if isinstance(file, (str, Path)):
file = format_path(file)

# try fsspec scanner
if not os.path.exists((file)):
return _scan_ipc_fsspec(file, storage_options)

return LazyFrame.scan_ipc(
file=file,
n_rows=n_rows,
cache=cache,
rechunk=rechunk,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
storage_options=storage_options,
)


Expand Down Expand Up @@ -635,10 +622,6 @@ def scan_parquet(
if isinstance(file, (str, Path)):
file = format_path(file)

# try fsspec scanner
if not os.path.exists((file)):
return _scan_parquet_fsspec(file, storage_options)

return LazyFrame.scan_parquet(
file=file,
n_rows=n_rows,
Expand All @@ -647,6 +630,7 @@ def scan_parquet(
rechunk=rechunk,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
storage_options=storage_options,
)


Expand Down
6 changes: 6 additions & 0 deletions py-polars/tests/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,9 @@ def test_csv_string_escaping() -> None:
f.seek(0)
df_read = pl.read_csv(f)
assert df_read.frame_equal(df)


def test_glob_csv(io_test_dir: str) -> None:
path = os.path.join(io_test_dir, "small*.csv")
assert pl.scan_csv(path).collect().shape == (3, 11)
assert pl.read_csv(path).shape == (3, 11)
7 changes: 7 additions & 0 deletions py-polars/tests/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,10 @@ def test_ipc_column_order() -> None:
columns = ["colc", "colb", "cola"]
# read file into polars; the specified column order is no longer respected
assert pl.read_ipc(f, columns=columns).columns == columns


def test_glob_ipc(io_test_dir: str) -> None:
if os.name != "nt":
path = os.path.join(io_test_dir, "small*.ipc")
assert pl.scan_ipc(path).collect().shape == (3, 12)
assert pl.read_ipc(path).shape == (3, 12)
6 changes: 6 additions & 0 deletions py-polars/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,9 @@ def test_nested_parquet() -> None:
assert read.columns == ["a"]
assert isinstance(read.dtypes[0], pl.datatypes.List)
assert isinstance(read.dtypes[0].inner, pl.datatypes.Struct)


def test_glob_parquet(io_test_dir: str) -> None:
path = os.path.join(io_test_dir, "small*.parquet")
assert pl.read_parquet(path).shape == (3, 16)
assert pl.scan_parquet(path).collect().shape == (3, 16)

0 comments on commit 73e99b9

Please sign in to comment.