diff --git a/py-polars/Makefile b/py-polars/Makefile index 1eabb011bd58..616f7a901926 100644 --- a/py-polars/Makefile +++ b/py-polars/Makefile @@ -85,7 +85,7 @@ doctest: .venv build ## Run doctests $(VENV_BIN)/pytest tests/docs/test_user_guide.py -m docs .PHONY: docs -docs: .venv build ## Build Python docs (incremental) +docs: .venv ## Build Python docs (incremental) @$(MAKE) -s -C docs html .PHONY: docs-clean diff --git a/py-polars/polars/_utils/various.py b/py-polars/polars/_utils/various.py index db18f2adf7fc..5856168ed95c 100644 --- a/py-polars/polars/_utils/various.py +++ b/py-polars/polars/_utils/various.py @@ -158,31 +158,6 @@ def range_to_slice(rng: range) -> slice: return slice(rng.start, rng.stop, rng.step) -def handle_projection_columns( - columns: Sequence[str] | Sequence[int] | str | None, -) -> tuple[list[int] | None, Sequence[str] | None]: - """Disambiguates between columns specified as integers vs. strings.""" - projection: list[int] | None = None - new_columns: Sequence[str] | None = None - if columns is not None: - if isinstance(columns, str): - new_columns = [columns] - elif is_int_sequence(columns): - projection = list(columns) - elif not is_str_sequence(columns): - msg = "`columns` arg should contain a list of all integers or all strings values" - raise TypeError(msg) - else: - new_columns = columns - if columns and len(set(columns)) != len(columns): - msg = f"`columns` arg should only have unique values, got {columns!r}" - raise ValueError(msg) - if projection and len(set(projection)) != len(projection): - msg = f"`columns` arg should only have unique values, got {projection!r}" - raise ValueError(msg) - return projection, new_columns - - def _prepare_row_index_args( row_index_name: str | None = None, row_index_offset: int = 0, diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 0f95beef18e9..ca84b31cc6f5 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -56,9 +56,6 @@ from polars._utils.parse_expr_input import parse_as_expression from polars._utils.unstable import issue_unstable_warning, unstable from polars._utils.various import ( - _prepare_row_index_args, - _process_null_values, - handle_projection_columns, is_bool_sequence, is_int_sequence, is_str_sequence, @@ -78,7 +75,6 @@ Float64, Object, String, - py_type_to_dtype, ) from polars.dependencies import ( _HVPLOT_AVAILABLE, @@ -99,7 +95,6 @@ TooManyRowsReturnedError, ) from polars.functions import col, lit -from polars.io._utils import _is_glob_pattern, _is_local_file from polars.io.csv._utils import _check_arg_is_1byte from polars.io.spreadsheet._write_utils import ( _unpack_multi_column_dict, @@ -116,7 +111,6 @@ from polars.type_aliases import DbWriteMode with contextlib.suppress(ImportError): # Module not available when building docs - from polars.polars import PyDataFrame from polars.polars import dtype_str_repr as _dtype_str_repr from polars.polars import write_clipboard_string as _write_clipboard_string @@ -132,6 +126,7 @@ from polars import DataType, Expr, LazyFrame, Series from polars.interchange.dataframe import PolarsDataFrame + from polars.polars import PyDataFrame from polars.type_aliases import ( AsofJoinStrategy, AvroCompression, @@ -142,7 +137,6 @@ ColumnWidthsDefinition, ComparisonOperator, ConditionalFormatDict, - CsvEncoding, CsvQuoteStyle, DbWriteEngine, FillNullStrategy, @@ -157,7 +151,6 @@ NullStrategy, OneOrMoreDataTypes, Orientation, - ParallelStrategy, ParquetCompression, PivotAgg, PolarsDataType, @@ -535,450 +528,6 @@ def _from_pandas( ) ) - @classmethod - def _read_csv( - cls, - source: str | Path | IO[bytes] | bytes, - *, - has_header: bool = True, - columns: Sequence[int] | Sequence[str] | None = None, - separator: str = ",", - comment_prefix: str | None = None, - quote_char: str | None = '"', - skip_rows: int = 0, - dtypes: None | (SchemaDict | Sequence[PolarsDataType]) = None, - schema: None | SchemaDict = None, - null_values: str | Sequence[str] | dict[str, str] | None = None, - missing_utf8_is_empty_string: bool = False, - ignore_errors: bool = False, - try_parse_dates: bool = False, - n_threads: int | None = None, - infer_schema_length: int | None = N_INFER_DEFAULT, - batch_size: int = 8192, - n_rows: int | None = None, - encoding: CsvEncoding = "utf8", - low_memory: bool = False, - rechunk: bool = True, - skip_rows_after_header: int = 0, - row_index_name: str | None = None, - row_index_offset: int = 0, - sample_size: int = 1024, - eol_char: str = "\n", - raise_if_empty: bool = True, - truncate_ragged_lines: bool = False, - ) -> DataFrame: - """ - Read a CSV file into a DataFrame. - - Use `pl.read_csv` to dispatch to this method. - - See Also - -------- - polars.io.read_csv - """ - self = cls.__new__(cls) - - path: str | None - if isinstance(source, (str, Path)): - path = normalize_filepath(source) - else: - path = None - if isinstance(source, BytesIO): - source = source.getvalue() - if isinstance(source, StringIO): - source = source.getvalue().encode() - - dtype_list: Sequence[tuple[str, PolarsDataType]] | None = None - dtype_slice: Sequence[PolarsDataType] | None = None - if dtypes is not None: - if isinstance(dtypes, dict): - dtype_list = [] - for k, v in dtypes.items(): - dtype_list.append((k, py_type_to_dtype(v))) - elif isinstance(dtypes, Sequence): - dtype_slice = dtypes - else: - msg = f"`dtypes` should be of type list or dict, got {type(dtypes).__name__!r}" - raise TypeError(msg) - - processed_null_values = _process_null_values(null_values) - - if isinstance(columns, str): - columns = [columns] - if isinstance(source, str) and _is_glob_pattern(source): - dtypes_dict = None - if dtype_list is not None: - dtypes_dict = dict(dtype_list) - if dtype_slice is not None: - msg = ( - "cannot use glob patterns and unnamed dtypes as `dtypes` argument" - "\n\nUse `dtypes`: Mapping[str, Type[DataType]]" - ) - raise ValueError(msg) - from polars import scan_csv - - scan = scan_csv( - source, - has_header=has_header, - separator=separator, - comment_prefix=comment_prefix, - quote_char=quote_char, - skip_rows=skip_rows, - dtypes=dtypes_dict, - schema=schema, - null_values=null_values, - missing_utf8_is_empty_string=missing_utf8_is_empty_string, - ignore_errors=ignore_errors, - infer_schema_length=infer_schema_length, - n_rows=n_rows, - low_memory=low_memory, - rechunk=rechunk, - skip_rows_after_header=skip_rows_after_header, - row_index_name=row_index_name, - row_index_offset=row_index_offset, - eol_char=eol_char, - raise_if_empty=raise_if_empty, - truncate_ragged_lines=truncate_ragged_lines, - ) - if columns is None: - return scan.collect() - elif is_str_sequence(columns, allow_str=False): - return scan.select(columns).collect() - else: - msg = ( - "cannot use glob patterns and integer based projection as `columns` argument" - "\n\nUse columns: List[str]" - ) - raise ValueError(msg) - - projection, columns = handle_projection_columns(columns) - - self._df = PyDataFrame.read_csv( - source, - infer_schema_length, - batch_size, - has_header, - ignore_errors, - n_rows, - skip_rows, - projection, - separator, - rechunk, - columns, - encoding, - n_threads, - path, - dtype_list, - dtype_slice, - low_memory, - comment_prefix, - quote_char, - processed_null_values, - missing_utf8_is_empty_string, - try_parse_dates, - skip_rows_after_header, - _prepare_row_index_args(row_index_name, row_index_offset), - sample_size=sample_size, - eol_char=eol_char, - raise_if_empty=raise_if_empty, - truncate_ragged_lines=truncate_ragged_lines, - schema=schema, - ) - return self - - @classmethod - def _read_parquet( - cls, - source: str | Path | IO[bytes] | bytes, - *, - columns: Sequence[int] | Sequence[str] | None = None, - n_rows: int | None = None, - parallel: ParallelStrategy = "auto", - row_index_name: str | None = None, - row_index_offset: int = 0, - low_memory: bool = False, - use_statistics: bool = True, - rechunk: bool = True, - ) -> DataFrame: - """ - Read into a DataFrame from a parquet file. - - Use `pl.read_parquet` to dispatch to this method. - - See Also - -------- - polars.io.read_parquet - """ - if isinstance(source, (str, Path)): - source = normalize_filepath(source) - if isinstance(columns, str): - columns = [columns] - - if isinstance(source, str) and _is_glob_pattern(source): - from polars import scan_parquet - - scan = scan_parquet( - source, - n_rows=n_rows, - rechunk=True, - parallel=parallel, - row_index_name=row_index_name, - row_index_offset=row_index_offset, - low_memory=low_memory, - ) - - if columns is None: - return scan.collect() - elif is_str_sequence(columns, allow_str=False): - return scan.select(columns).collect() - else: - msg = ( - "cannot use glob patterns and integer based projection as `columns` argument" - "\n\nUse columns: List[str]" - ) - raise TypeError(msg) - - projection, columns = handle_projection_columns(columns) - self = cls.__new__(cls) - self._df = PyDataFrame.read_parquet( - source, - columns, - projection, - n_rows, - parallel, - _prepare_row_index_args(row_index_name, row_index_offset), - low_memory=low_memory, - use_statistics=use_statistics, - rechunk=rechunk, - ) - return self - - @classmethod - def _read_avro( - cls, - source: str | Path | IO[bytes] | bytes, - *, - columns: Sequence[int] | Sequence[str] | None = None, - n_rows: int | None = None, - ) -> Self: - """ - Read into a DataFrame from Apache Avro format. - - Parameters - ---------- - source - Path to a file or a file-like object (by file-like object, we refer to - objects that have a `read()` method, such as a file handler (e.g. - via builtin `open` function) or `BytesIO`). - columns - Columns. - n_rows - Stop reading from Apache Avro file after reading `n_rows`. - """ - if isinstance(source, (str, Path)): - source = normalize_filepath(source) - projection, columns = handle_projection_columns(columns) - self = cls.__new__(cls) - self._df = PyDataFrame.read_avro(source, columns, projection, n_rows) - return self - - @classmethod - def _read_ipc( - cls, - source: str | Path | IO[bytes] | bytes, - *, - columns: Sequence[int] | Sequence[str] | None = None, - n_rows: int | None = None, - row_index_name: str | None = None, - row_index_offset: int = 0, - rechunk: bool = True, - memory_map: bool = True, - ) -> Self: - """ - Read into a DataFrame from Arrow IPC file format. - - See "File or Random Access format" on https://arrow.apache.org/docs/python/ipc.html. - Arrow IPC files are also known as Feather (v2) files. - - Parameters - ---------- - source - Path to a file or a file-like object (by file-like object, we refer to - objects that have a `read()` method, such as a file handler (e.g. - via builtin `open` function) or `BytesIO`). - columns - Columns to select. Accepts a list of column indices (starting at zero) or a - list of column names. - n_rows - Stop reading from IPC file after reading `n_rows`. - row_index_name - Row index name. - row_index_offset - Row index offset. - rechunk - Make sure that all data is contiguous. - memory_map - Memory map the file - """ - if isinstance(source, (str, Path)): - source = normalize_filepath(source) - if isinstance(columns, str): - columns = [columns] - - if ( - isinstance(source, str) - and _is_glob_pattern(source) - and _is_local_file(source) - ): - from polars import scan_ipc - - scan = scan_ipc( - source, - n_rows=n_rows, - rechunk=rechunk, - row_index_name=row_index_name, - row_index_offset=row_index_offset, - memory_map=memory_map, - ) - if columns is None: - df = scan.collect() - elif is_str_sequence(columns, allow_str=False): - df = scan.select(columns).collect() - else: - msg = ( - "cannot use glob patterns and integer based projection as `columns` argument" - "\n\nUse columns: List[str]" - ) - raise TypeError(msg) - return cls._from_pydf(df._df) - - projection, columns = handle_projection_columns(columns) - self = cls.__new__(cls) - self._df = PyDataFrame.read_ipc( - source, - columns, - projection, - n_rows, - _prepare_row_index_args(row_index_name, row_index_offset), - memory_map=memory_map, - ) - return self - - @classmethod - def _read_ipc_stream( - cls, - source: str | Path | IO[bytes] | bytes, - *, - columns: Sequence[int] | Sequence[str] | None = None, - n_rows: int | None = None, - row_index_name: str | None = None, - row_index_offset: int = 0, - rechunk: bool = True, - ) -> Self: - """ - Read into a DataFrame from Arrow IPC record batch stream format. - - See "Streaming format" on https://arrow.apache.org/docs/python/ipc.html. - - Parameters - ---------- - source - Path to a file or a file-like object (by file-like object, we refer to - objects that have a `read()` method, such as a file handler (e.g. - via builtin `open` function) or `BytesIO`). - columns - Columns to select. Accepts a list of column indices (starting at zero) or a - list of column names. - n_rows - Stop reading from IPC stream after reading `n_rows`. - row_index_name - Row index name. - row_index_offset - Row index offset. - rechunk - Make sure that all data is contiguous. - """ - if isinstance(source, (str, Path)): - source = normalize_filepath(source) - if isinstance(columns, str): - columns = [columns] - - projection, columns = handle_projection_columns(columns) - self = cls.__new__(cls) - self._df = PyDataFrame.read_ipc_stream( - source, - columns, - projection, - n_rows, - _prepare_row_index_args(row_index_name, row_index_offset), - rechunk, - ) - return self - - @classmethod - def _read_json( - cls, - source: str | Path | IOBase | bytes, - *, - schema: SchemaDefinition | None = None, - schema_overrides: SchemaDefinition | None = None, - infer_schema_length: int | None = N_INFER_DEFAULT, - ) -> Self: - """ - Read into a DataFrame from a JSON file. - - Use `pl.read_json` to dispatch to this method. - - See Also - -------- - polars.io.read_json - """ - if isinstance(source, StringIO): - source = BytesIO(source.getvalue().encode()) - elif isinstance(source, (str, Path)): - source = normalize_filepath(source) - - self = cls.__new__(cls) - self._df = PyDataFrame.read_json( - source, - infer_schema_length=infer_schema_length, - schema=schema, - schema_overrides=schema_overrides, - ) - return self - - @classmethod - def _read_ndjson( - cls, - source: str | Path | IOBase | bytes, - *, - schema: SchemaDefinition | None = None, - schema_overrides: SchemaDefinition | None = None, - ignore_errors: bool = False, - ) -> Self: - """ - Read into a DataFrame from a newline delimited JSON file. - - Use `pl.read_ndjson` to dispatch to this method. - - See Also - -------- - polars.io.read_ndjson - """ - if isinstance(source, StringIO): - source = BytesIO(source.getvalue().encode()) - elif isinstance(source, (str, Path)): - source = normalize_filepath(source) - - self = cls.__new__(cls) - self._df = PyDataFrame.read_ndjson( - source, - ignore_errors=ignore_errors, - schema=schema, - schema_overrides=schema_overrides, - ) - return self - def _replace(self, column: str, new_column: Series) -> Self: """Replace a column by a new Series (in place).""" self._df.replace(column, new_column._s) diff --git a/py-polars/polars/io/_utils.py b/py-polars/polars/io/_utils.py index dac87c04900d..b5e249c6b807 100644 --- a/py-polars/polars/io/_utils.py +++ b/py-polars/polars/io/_utils.py @@ -6,13 +6,38 @@ from io import BytesIO, StringIO from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO, Any, ContextManager, Iterator, cast, overload +from typing import IO, Any, ContextManager, Iterator, Sequence, cast, overload -from polars._utils.various import normalize_filepath +from polars._utils.various import is_int_sequence, is_str_sequence, normalize_filepath from polars.dependencies import _FSSPEC_AVAILABLE, fsspec from polars.exceptions import NoDataError +def handle_projection_columns( + columns: Sequence[str] | Sequence[int] | str | None, +) -> tuple[list[int] | None, Sequence[str] | None]: + """Disambiguates between columns specified as integers vs. strings.""" + projection: list[int] | None = None + new_columns: Sequence[str] | None = None + if columns is not None: + if isinstance(columns, str): + new_columns = [columns] + elif is_int_sequence(columns): + projection = list(columns) + elif not is_str_sequence(columns): + msg = "`columns` arg should contain a list of all integers or all strings values" + raise TypeError(msg) + else: + new_columns = columns + if columns and len(set(columns)) != len(columns): + msg = f"`columns` arg should only have unique values, got {columns!r}" + raise ValueError(msg) + if projection and len(set(projection)) != len(projection): + msg = f"`columns` arg should only have unique values, got {projection!r}" + raise ValueError(msg) + return projection, new_columns + + def _is_glob_pattern(file: str) -> bool: return any(char in file for char in ["*", "?", "["]) diff --git a/py-polars/polars/io/avro.py b/py-polars/polars/io/avro.py index a25be704c4f2..3d695ba6ad35 100644 --- a/py-polars/polars/io/avro.py +++ b/py-polars/polars/io/avro.py @@ -1,12 +1,17 @@ from __future__ import annotations +import contextlib +from pathlib import Path from typing import IO, TYPE_CHECKING -import polars._reexport as pl +from polars._utils.various import normalize_filepath +from polars._utils.wrap import wrap_df +from polars.io._utils import handle_projection_columns -if TYPE_CHECKING: - from pathlib import Path +with contextlib.suppress(ImportError): # Module not available when building docs + from polars.polars import PyDataFrame +if TYPE_CHECKING: from polars import DataFrame @@ -35,4 +40,9 @@ def read_avro( ------- DataFrame """ - return pl.DataFrame._read_avro(source, n_rows=n_rows, columns=columns) + if isinstance(source, (str, Path)): + source = normalize_filepath(source) + projection, parsed_columns = handle_projection_columns(columns) + + pydf = PyDataFrame.read_avro(source, parsed_columns, projection, n_rows) + return wrap_df(pydf) diff --git a/py-polars/polars/io/csv/batched_reader.py b/py-polars/polars/io/csv/batched_reader.py index 101672f7a5e9..6f79d0e71b63 100644 --- a/py-polars/polars/io/csv/batched_reader.py +++ b/py-polars/polars/io/csv/batched_reader.py @@ -6,11 +6,11 @@ from polars._utils.various import ( _prepare_row_index_args, _process_null_values, - handle_projection_columns, normalize_filepath, ) from polars._utils.wrap import wrap_df from polars.datatypes import N_INFER_DEFAULT, py_type_to_dtype +from polars.io._utils import handle_projection_columns from polars.io.csv._utils import _update_columns with contextlib.suppress(ImportError): # Module not available when building docs diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 9291345a9a60..b3410ac088c1 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -1,16 +1,32 @@ from __future__ import annotations +import contextlib +from io import BytesIO, StringIO from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Callable, Mapping, Sequence import polars._reexport as pl from polars._utils.deprecation import deprecate_renamed_parameter -from polars._utils.various import handle_projection_columns, normalize_filepath +from polars._utils.various import ( + _prepare_row_index_args, + _process_null_values, + is_str_sequence, + normalize_filepath, +) +from polars._utils.wrap import wrap_df, wrap_ldf from polars.datatypes import N_INFER_DEFAULT, String -from polars.io._utils import _prepare_file_arg +from polars.datatypes.convert import py_type_to_dtype +from polars.io._utils import ( + _is_glob_pattern, + _prepare_file_arg, + handle_projection_columns, +) from polars.io.csv._utils import _check_arg_is_1byte, _update_columns from polars.io.csv.batched_reader import BatchedCsvReader +with contextlib.suppress(ImportError): # Module not available when building docs + from polars.polars import PyDataFrame, PyLazyFrame + if TYPE_CHECKING: from polars import DataFrame, LazyFrame from polars.type_aliases import CsvEncoding, PolarsDataType, SchemaDict @@ -394,7 +410,7 @@ def read_csv( raise_if_empty=raise_if_empty, storage_options=storage_options, ) as data: - df = pl.DataFrame._read_csv( + df = _read_csv_impl( data, has_header=has_header, columns=columns if columns else projection, @@ -429,6 +445,145 @@ def read_csv( return df +def _read_csv_impl( + source: str | Path | IO[bytes] | bytes, + *, + has_header: bool = True, + columns: Sequence[int] | Sequence[str] | None = None, + separator: str = ",", + comment_prefix: str | None = None, + quote_char: str | None = '"', + skip_rows: int = 0, + dtypes: None | (SchemaDict | Sequence[PolarsDataType]) = None, + schema: None | SchemaDict = None, + null_values: str | Sequence[str] | dict[str, str] | None = None, + missing_utf8_is_empty_string: bool = False, + ignore_errors: bool = False, + try_parse_dates: bool = False, + n_threads: int | None = None, + infer_schema_length: int | None = N_INFER_DEFAULT, + batch_size: int = 8192, + n_rows: int | None = None, + encoding: CsvEncoding = "utf8", + low_memory: bool = False, + rechunk: bool = True, + skip_rows_after_header: int = 0, + row_index_name: str | None = None, + row_index_offset: int = 0, + sample_size: int = 1024, + eol_char: str = "\n", + raise_if_empty: bool = True, + truncate_ragged_lines: bool = False, +) -> DataFrame: + path: str | None + if isinstance(source, (str, Path)): + path = normalize_filepath(source) + else: + path = None + if isinstance(source, BytesIO): + source = source.getvalue() + if isinstance(source, StringIO): + source = source.getvalue().encode() + + dtype_list: Sequence[tuple[str, PolarsDataType]] | None = None + dtype_slice: Sequence[PolarsDataType] | None = None + if dtypes is not None: + if isinstance(dtypes, dict): + dtype_list = [] + for k, v in dtypes.items(): + dtype_list.append((k, py_type_to_dtype(v))) + elif isinstance(dtypes, Sequence): + dtype_slice = dtypes + else: + msg = f"`dtypes` should be of type list or dict, got {type(dtypes).__name__!r}" + raise TypeError(msg) + + processed_null_values = _process_null_values(null_values) + + if isinstance(columns, str): + columns = [columns] + if isinstance(source, str) and _is_glob_pattern(source): + dtypes_dict = None + if dtype_list is not None: + dtypes_dict = dict(dtype_list) + if dtype_slice is not None: + msg = ( + "cannot use glob patterns and unnamed dtypes as `dtypes` argument" + "\n\nUse `dtypes`: Mapping[str, Type[DataType]]" + ) + raise ValueError(msg) + from polars import scan_csv + + scan = scan_csv( + source, + has_header=has_header, + separator=separator, + comment_prefix=comment_prefix, + quote_char=quote_char, + skip_rows=skip_rows, + dtypes=dtypes_dict, + schema=schema, + null_values=null_values, + missing_utf8_is_empty_string=missing_utf8_is_empty_string, + ignore_errors=ignore_errors, + infer_schema_length=infer_schema_length, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + skip_rows_after_header=skip_rows_after_header, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + eol_char=eol_char, + raise_if_empty=raise_if_empty, + truncate_ragged_lines=truncate_ragged_lines, + ) + if columns is None: + return scan.collect() + elif is_str_sequence(columns, allow_str=False): + return scan.select(columns).collect() + else: + msg = ( + "cannot use glob patterns and integer based projection as `columns` argument" + "\n\nUse columns: List[str]" + ) + raise ValueError(msg) + + projection, columns = handle_projection_columns(columns) + + pydf = PyDataFrame.read_csv( + source, + infer_schema_length, + batch_size, + has_header, + ignore_errors, + n_rows, + skip_rows, + projection, + separator, + rechunk, + columns, + encoding, + n_threads, + path, + dtype_list, + dtype_slice, + low_memory, + comment_prefix, + quote_char, + processed_null_values, + missing_utf8_is_empty_string, + try_parse_dates, + skip_rows_after_header, + _prepare_row_index_args(row_index_name, row_index_offset), + sample_size=sample_size, + eol_char=eol_char, + raise_if_empty=raise_if_empty, + truncate_ragged_lines=truncate_ragged_lines, + schema=schema, + ) + return wrap_df(pydf) + + @deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") @deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") @deprecate_renamed_parameter( @@ -944,7 +1099,7 @@ def with_column_names(cols: list[str]) -> list[str]: else: source = [normalize_filepath(source) for source in source] - return pl.LazyFrame._scan_csv( + return _scan_csv_impl( source, has_header=has_header, separator=separator, @@ -971,3 +1126,74 @@ def with_column_names(cols: list[str]) -> list[str]: raise_if_empty=raise_if_empty, truncate_ragged_lines=truncate_ragged_lines, ) + + +def _scan_csv_impl( + source: str | list[str] | list[Path], + *, + has_header: bool = True, + separator: str = ",", + comment_prefix: str | None = None, + quote_char: str | None = '"', + skip_rows: int = 0, + dtypes: SchemaDict | None = None, + schema: SchemaDict | None = None, + null_values: str | Sequence[str] | dict[str, str] | None = None, + missing_utf8_is_empty_string: bool = False, + ignore_errors: bool = False, + cache: bool = True, + with_column_names: Callable[[list[str]], list[str]] | None = None, + infer_schema_length: int | None = N_INFER_DEFAULT, + n_rows: int | None = None, + encoding: CsvEncoding = "utf8", + low_memory: bool = False, + rechunk: bool = True, + skip_rows_after_header: int = 0, + row_index_name: str | None = None, + row_index_offset: int = 0, + try_parse_dates: bool = False, + eol_char: str = "\n", + raise_if_empty: bool = True, + truncate_ragged_lines: bool = True, +) -> LazyFrame: + dtype_list: list[tuple[str, PolarsDataType]] | None = None + if dtypes is not None: + dtype_list = [] + for k, v in dtypes.items(): + dtype_list.append((k, py_type_to_dtype(v))) + processed_null_values = _process_null_values(null_values) + + if isinstance(source, list): + sources = source + source = None # type: ignore[assignment] + else: + sources = [] + + pylf = PyLazyFrame.new_from_csv( + source, + sources, + separator, + has_header, + ignore_errors, + skip_rows, + n_rows, + cache, + dtype_list, + low_memory, + comment_prefix, + quote_char, + processed_null_values, + missing_utf8_is_empty_string, + infer_schema_length, + with_column_names, + rechunk, + skip_rows_after_header, + encoding, + _prepare_row_index_args(row_index_name, row_index_offset), + try_parse_dates, + eol_char=eol_char, + raise_if_empty=raise_if_empty, + truncate_ragged_lines=truncate_ragged_lines, + schema=schema, + ) + return wrap_ldf(pylf) diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index 55d07848cb5e..52d6b6a307c8 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -2,15 +2,27 @@ import contextlib from pathlib import Path -from typing import IO, TYPE_CHECKING, Any +from typing import IO, TYPE_CHECKING, Any, Sequence import polars._reexport as pl from polars._utils.deprecation import deprecate_renamed_parameter -from polars._utils.various import normalize_filepath +from polars._utils.various import ( + _prepare_row_index_args, + is_str_sequence, + normalize_filepath, +) +from polars._utils.wrap import wrap_df, wrap_ldf from polars.dependencies import _PYARROW_AVAILABLE -from polars.io._utils import _prepare_file_arg +from polars.io._utils import ( + _is_glob_pattern, + _is_local_file, + _prepare_file_arg, + handle_projection_columns, +) +from polars.io.ipc.anonymous_scan import _scan_ipc_fsspec -with contextlib.suppress(ImportError): +with contextlib.suppress(ImportError): # Module not available when building docs + from polars.polars import PyDataFrame, PyLazyFrame from polars.polars import read_ipc_schema as _read_ipc_schema if TYPE_CHECKING: @@ -34,6 +46,9 @@ def read_ipc( """ Read into a DataFrame from Arrow IPC (Feather v2) file. + See "File or Random Access format" on https://arrow.apache.org/docs/python/ipc.html. + Arrow IPC files are also known as Feather (v2) files. + Parameters ---------- source @@ -98,7 +113,7 @@ def read_ipc( df = df.slice(0, n_rows) return df - return pl.DataFrame._read_ipc( + return _read_ipc_impl( data, columns=columns, n_rows=n_rows, @@ -109,6 +124,54 @@ def read_ipc( ) +def _read_ipc_impl( + source: str | Path | IO[bytes] | bytes, + *, + columns: Sequence[int] | Sequence[str] | None = None, + n_rows: int | None = None, + row_index_name: str | None = None, + row_index_offset: int = 0, + rechunk: bool = True, + memory_map: bool = True, +) -> DataFrame: + if isinstance(source, (str, Path)): + source = normalize_filepath(source) + if isinstance(columns, str): + columns = [columns] + + if isinstance(source, str) and _is_glob_pattern(source) and _is_local_file(source): + scan = scan_ipc( + source, + n_rows=n_rows, + rechunk=rechunk, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + memory_map=memory_map, + ) + if columns is None: + df = scan.collect() + elif is_str_sequence(columns, allow_str=False): + df = scan.select(columns).collect() + else: + msg = ( + "cannot use glob patterns and integer based projection as `columns` argument" + "\n\nUse columns: List[str]" + ) + raise TypeError(msg) + return df + + projection, columns = handle_projection_columns(columns) + pydf = PyDataFrame.read_ipc( + source, + columns, + projection, + n_rows, + _prepare_row_index_args(row_index_name, row_index_offset), + memory_map=memory_map, + ) + return wrap_df(pydf) + + @deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") @deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def read_ipc_stream( @@ -125,6 +188,8 @@ def read_ipc_stream( """ Read into a DataFrame from Arrow IPC record batch stream. + See "Streaming format" on https://arrow.apache.org/docs/python/ipc.html. + Parameters ---------- source @@ -178,7 +243,7 @@ def read_ipc_stream( df = df.slice(0, n_rows) return df - return pl.DataFrame._read_ipc_stream( + return _read_ipc_stream_impl( data, columns=columns, n_rows=n_rows, @@ -188,6 +253,32 @@ def read_ipc_stream( ) +def _read_ipc_stream_impl( + source: str | Path | IO[bytes] | bytes, + *, + columns: Sequence[int] | Sequence[str] | None = None, + n_rows: int | None = None, + row_index_name: str | None = None, + row_index_offset: int = 0, + rechunk: bool = True, +) -> DataFrame: + if isinstance(source, (str, Path)): + source = normalize_filepath(source) + if isinstance(columns, str): + columns = [columns] + + projection, columns = handle_projection_columns(columns) + pydf = PyDataFrame.read_ipc_stream( + source, + columns, + projection, + n_rows, + _prepare_row_index_args(row_index_name, row_index_offset), + rechunk, + ) + return wrap_df(pydf) + + def read_ipc_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, DataType]: """ Get the schema of an IPC file without reading data. @@ -257,14 +348,33 @@ def scan_ipc( Number of retries if accessing a cloud instance fails. """ - return pl.LazyFrame._scan_ipc( + if isinstance(source, (str, Path)): + can_use_fsspec = True + source = normalize_filepath(source) + sources = [] + else: + can_use_fsspec = False + sources = [normalize_filepath(source) for source in source] + source = None # type: ignore[assignment] + + # try fsspec scanner + if can_use_fsspec and not _is_local_file(source): # type: ignore[arg-type] + scan = _scan_ipc_fsspec(source, storage_options) # type: ignore[arg-type] + if n_rows: + scan = scan.head(n_rows) + if row_index_name is not None: + scan = scan.with_row_index(row_index_name, row_index_offset) + return scan + + pylf = PyLazyFrame.new_from_ipc( source, - n_rows=n_rows, - cache=cache, - rechunk=rechunk, - row_index_name=row_index_name, - row_index_offset=row_index_offset, - storage_options=storage_options, + sources, + n_rows, + cache, + rechunk, + _prepare_row_index_args(row_index_name, row_index_offset), memory_map=memory_map, + cloud_options=storage_options, retries=retries, ) + return wrap_ldf(pylf) diff --git a/py-polars/polars/io/json.py b/py-polars/polars/io/json.py index 099c6936e780..c89af4df0347 100644 --- a/py-polars/polars/io/json.py +++ b/py-polars/polars/io/json.py @@ -1,13 +1,19 @@ from __future__ import annotations +import contextlib +from io import BytesIO, StringIO +from pathlib import Path from typing import TYPE_CHECKING -import polars._reexport as pl +from polars._utils.various import normalize_filepath +from polars._utils.wrap import wrap_df from polars.datatypes import N_INFER_DEFAULT +with contextlib.suppress(ImportError): # Module not available when building docs + from polars.polars import PyDataFrame + if TYPE_CHECKING: from io import IOBase - from pathlib import Path from polars import DataFrame from polars.type_aliases import SchemaDefinition @@ -50,9 +56,15 @@ def read_json( -------- read_ndjson """ - return pl.DataFrame._read_json( + if isinstance(source, StringIO): + source = BytesIO(source.getvalue().encode()) + elif isinstance(source, (str, Path)): + source = normalize_filepath(source) + + pydf = PyDataFrame.read_json( source, + infer_schema_length=infer_schema_length, schema=schema, schema_overrides=schema_overrides, - infer_schema_length=infer_schema_length, ) + return wrap_df(pydf) diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index 9d413e6de10d..0e810a3f1eea 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -1,14 +1,20 @@ from __future__ import annotations +import contextlib +from io import BytesIO, StringIO +from pathlib import Path from typing import TYPE_CHECKING -import polars._reexport as pl from polars._utils.deprecation import deprecate_renamed_parameter +from polars._utils.various import _prepare_row_index_args, normalize_filepath +from polars._utils.wrap import wrap_df, wrap_ldf from polars.datatypes import N_INFER_DEFAULT +with contextlib.suppress(ImportError): # Module not available when building docs + from polars.polars import PyDataFrame, PyLazyFrame + if TYPE_CHECKING: from io import IOBase - from pathlib import Path from polars import DataFrame, LazyFrame from polars.type_aliases import SchemaDefinition @@ -46,12 +52,18 @@ def read_ndjson( ignore_errors Return `Null` if parsing fails because of schema mismatches. """ - return pl.DataFrame._read_ndjson( + if isinstance(source, StringIO): + source = BytesIO(source.getvalue().encode()) + elif isinstance(source, (str, Path)): + source = normalize_filepath(source) + + pydf = PyDataFrame.read_ndjson( source, + ignore_errors=ignore_errors, schema=schema, schema_overrides=schema_overrides, - ignore_errors=ignore_errors, ) + return wrap_df(pydf) @deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") @@ -108,15 +120,23 @@ def scan_ndjson( ignore_errors Return `Null` if parsing fails because of schema mismatches. """ - return pl.LazyFrame._scan_ndjson( + if isinstance(source, (str, Path)): + source = normalize_filepath(source) + sources = [] + else: + sources = [normalize_filepath(source) for source in source] + source = None # type: ignore[assignment] + + pylf = PyLazyFrame.new_from_ndjson( source, - infer_schema_length=infer_schema_length, - schema=schema, - batch_size=batch_size, - n_rows=n_rows, - low_memory=low_memory, - rechunk=rechunk, - row_index_name=row_index_name, - row_index_offset=row_index_offset, - ignore_errors=ignore_errors, + sources, + infer_schema_length, + schema, + batch_size, + n_rows, + low_memory, + rechunk, + _prepare_row_index_args(row_index_name, row_index_offset), + ignore_errors, ) + return wrap_ldf(pylf) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index a0b53e5d0f98..d83b671159ab 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -3,17 +3,30 @@ import contextlib import io from pathlib import Path -from typing import IO, TYPE_CHECKING, Any +from typing import IO, TYPE_CHECKING, Any, Sequence -import polars._reexport as pl from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.unstable import issue_unstable_warning -from polars._utils.various import is_int_sequence, normalize_filepath +from polars._utils.various import ( + _prepare_row_index_args, + is_int_sequence, + is_str_sequence, + normalize_filepath, +) +from polars._utils.wrap import wrap_df, wrap_ldf from polars.convert import from_arrow from polars.dependencies import _PYARROW_AVAILABLE -from polars.io._utils import _prepare_file_arg +from polars.io._utils import ( + _is_glob_pattern, + _is_local_file, + _is_supported_cloud, + _prepare_file_arg, + handle_projection_columns, +) +from polars.io.parquet.anonymous_scan import _scan_parquet_fsspec with contextlib.suppress(ImportError): + from polars.polars import PyDataFrame, PyLazyFrame from polars.polars import read_parquet_schema as _read_parquet_schema if TYPE_CHECKING: @@ -171,7 +184,7 @@ def read_parquet( # Read binary types using `read_parquet` elif isinstance(source, (io.BufferedIOBase, io.RawIOBase, bytes)): with _prepare_file_arg(source, use_pyarrow=False) as source_prep: - return pl.DataFrame._read_parquet( + return _read_parquet_binary( source_prep, columns=columns, n_rows=n_rows, @@ -208,6 +221,63 @@ def read_parquet( return lf.collect() +def _read_parquet_binary( + source: str | Path | IO[bytes] | bytes, + *, + columns: Sequence[int] | Sequence[str] | None = None, + n_rows: int | None = None, + parallel: ParallelStrategy = "auto", + row_index_name: str | None = None, + row_index_offset: int = 0, + low_memory: bool = False, + use_statistics: bool = True, + rechunk: bool = True, +) -> DataFrame: + if isinstance(source, (str, Path)): + source = normalize_filepath(source) + if isinstance(columns, str): + columns = [columns] + + if isinstance(source, str) and _is_glob_pattern(source): + from polars import scan_parquet + + scan = scan_parquet( + source, + n_rows=n_rows, + rechunk=True, + parallel=parallel, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + low_memory=low_memory, + ) + + if columns is None: + return scan.collect() + elif is_str_sequence(columns, allow_str=False): + return scan.select(columns).collect() + else: + msg = ( + "cannot use glob patterns and integer based projection as `columns` argument" + "\n\nUse columns: List[str]" + ) + raise TypeError(msg) + + projection, columns = handle_projection_columns(columns) + + pydf = PyDataFrame.read_parquet( + source, + columns, + projection, + n_rows, + parallel, + _prepare_row_index_args(row_index_name, row_index_offset), + low_memory=low_memory, + use_statistics=use_statistics, + rechunk=rechunk, + ) + return wrap_df(pydf) + + def read_parquet_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, DataType]: """ Get the schema of a Parquet file without reading data. @@ -337,7 +407,7 @@ def scan_parquet( else: source = [normalize_filepath(source) for source in source] - return pl.LazyFrame._scan_parquet( + return _scan_parquet_impl( source, n_rows=n_rows, cache=cache, @@ -352,3 +422,64 @@ def scan_parquet( hive_schema=hive_schema, retries=retries, ) + + +def _scan_parquet_impl( + source: str | list[str] | list[Path], + *, + n_rows: int | None = None, + cache: bool = True, + parallel: ParallelStrategy = "auto", + rechunk: bool = True, + row_index_name: str | None = None, + row_index_offset: int = 0, + storage_options: dict[str, object] | None = None, + low_memory: bool = False, + use_statistics: bool = True, + hive_partitioning: bool = True, + hive_schema: SchemaDict | None = None, + retries: int = 0, +) -> LazyFrame: + if isinstance(source, list): + sources = source + source = None # type: ignore[assignment] + can_use_fsspec = False + else: + can_use_fsspec = True + sources = [] + + # try fsspec scanner + if ( + can_use_fsspec + and not _is_local_file(source) # type: ignore[arg-type] + and not _is_supported_cloud(source) # type: ignore[arg-type] + ): + scan = _scan_parquet_fsspec(source, storage_options) # type: ignore[arg-type] + if n_rows: + scan = scan.head(n_rows) + if row_index_name is not None: + scan = scan.with_row_index(row_index_name, row_index_offset) + return scan + + if storage_options: + storage_options = list(storage_options.items()) # type: ignore[assignment] + else: + # Handle empty dict input + storage_options = None + + pylf = PyLazyFrame.new_from_parquet( + source, + sources, + n_rows, + cache, + parallel, + rechunk, + _prepare_row_index_args(row_index_name, row_index_offset), + low_memory, + cloud_options=storage_options, + use_statistics=use_statistics, + hive_partitioning=hive_partitioning, + hive_schema=hive_schema, + retries=retries, + ) + return wrap_ldf(pylf) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 105545a8cfc6..0319118bb8ac 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -41,8 +41,6 @@ from polars._utils.unstable import issue_unstable_warning, unstable from polars._utils.various import ( _in_notebook, - _prepare_row_index_args, - _process_null_values, is_bool_sequence, is_sequence, normalize_filepath, @@ -79,10 +77,7 @@ py_type_to_dtype, ) from polars.dependencies import subprocess -from polars.io._utils import _is_local_file, _is_supported_cloud from polars.io.csv._utils import _check_arg_is_1byte -from polars.io.ipc.anonymous_scan import _scan_ipc_fsspec -from polars.io.parquet.anonymous_scan import _scan_parquet_fsspec from polars.lazyframe.group_by import LazyGroupBy from polars.lazyframe.in_process import InProcessQuery from polars.selectors import _expand_selectors, by_dtype, expand_selector @@ -104,7 +99,6 @@ AsofJoinStrategy, ClosedInterval, ColumnNameOrSelector, - CsvEncoding, CsvQuoteStyle, FillNullStrategy, FrameInitTypes, @@ -114,7 +108,6 @@ JoinValidation, Label, Orientation, - ParallelStrategy, PolarsDataType, RollingInterpolationMethod, SchemaDefinition, @@ -333,261 +326,6 @@ def __setstate__(self, state: bytes) -> None: self._ldf = LazyFrame()._ldf # Initialize with a dummy self._ldf.__setstate__(state) - @classmethod - def _scan_csv( - cls, - source: str | list[str] | list[Path], - *, - has_header: bool = True, - separator: str = ",", - comment_prefix: str | None = None, - quote_char: str | None = '"', - skip_rows: int = 0, - dtypes: SchemaDict | None = None, - schema: SchemaDict | None = None, - null_values: str | Sequence[str] | dict[str, str] | None = None, - missing_utf8_is_empty_string: bool = False, - ignore_errors: bool = False, - cache: bool = True, - with_column_names: Callable[[list[str]], list[str]] | None = None, - infer_schema_length: int | None = N_INFER_DEFAULT, - n_rows: int | None = None, - encoding: CsvEncoding = "utf8", - low_memory: bool = False, - rechunk: bool = True, - skip_rows_after_header: int = 0, - row_index_name: str | None = None, - row_index_offset: int = 0, - try_parse_dates: bool = False, - eol_char: str = "\n", - raise_if_empty: bool = True, - truncate_ragged_lines: bool = True, - ) -> Self: - """ - Lazily read from a CSV file or multiple files via glob patterns. - - Use `pl.scan_csv` to dispatch to this method. - - See Also - -------- - polars.io.scan_csv - """ - dtype_list: list[tuple[str, PolarsDataType]] | None = None - if dtypes is not None: - dtype_list = [] - for k, v in dtypes.items(): - dtype_list.append((k, py_type_to_dtype(v))) - processed_null_values = _process_null_values(null_values) - - if isinstance(source, list): - sources = source - source = None # type: ignore[assignment] - else: - sources = [] - - self = cls.__new__(cls) - self._ldf = PyLazyFrame.new_from_csv( - source, - sources, - separator, - has_header, - ignore_errors, - skip_rows, - n_rows, - cache, - dtype_list, - low_memory, - comment_prefix, - quote_char, - processed_null_values, - missing_utf8_is_empty_string, - infer_schema_length, - with_column_names, - rechunk, - skip_rows_after_header, - encoding, - _prepare_row_index_args(row_index_name, row_index_offset), - try_parse_dates, - eol_char=eol_char, - raise_if_empty=raise_if_empty, - truncate_ragged_lines=truncate_ragged_lines, - schema=schema, - ) - return self - - @classmethod - def _scan_parquet( - cls, - source: str | list[str] | list[Path], - *, - n_rows: int | None = None, - cache: bool = True, - parallel: ParallelStrategy = "auto", - rechunk: bool = True, - row_index_name: str | None = None, - row_index_offset: int = 0, - storage_options: dict[str, object] | None = None, - low_memory: bool = False, - use_statistics: bool = True, - hive_partitioning: bool = True, - hive_schema: SchemaDict | None = None, - retries: int = 0, - ) -> Self: - """ - Lazily read from a parquet file or multiple files via glob patterns. - - Use `pl.scan_parquet` to dispatch to this method. - - See Also - -------- - polars.io.scan_parquet - """ - if isinstance(source, list): - sources = source - source = None # type: ignore[assignment] - can_use_fsspec = False - else: - can_use_fsspec = True - sources = [] - - # try fsspec scanner - if ( - can_use_fsspec - and not _is_local_file(source) # type: ignore[arg-type] - and not _is_supported_cloud(source) # type: ignore[arg-type] - ): - scan = _scan_parquet_fsspec(source, storage_options) # type: ignore[arg-type] - if n_rows: - scan = scan.head(n_rows) - if row_index_name is not None: - scan = scan.with_row_index(row_index_name, row_index_offset) - return scan # type: ignore[return-value] - - if storage_options: - storage_options = list(storage_options.items()) # type: ignore[assignment] - else: - # Handle empty dict input - storage_options = None - - self = cls.__new__(cls) - self._ldf = PyLazyFrame.new_from_parquet( - source, - sources, - n_rows, - cache, - parallel, - rechunk, - _prepare_row_index_args(row_index_name, row_index_offset), - low_memory, - cloud_options=storage_options, - use_statistics=use_statistics, - hive_partitioning=hive_partitioning, - hive_schema=hive_schema, - retries=retries, - ) - return self - - @classmethod - def _scan_ipc( - cls, - source: str | Path | list[str] | list[Path], - *, - n_rows: int | None = None, - cache: bool = True, - rechunk: bool = True, - row_index_name: str | None = None, - row_index_offset: int = 0, - storage_options: dict[str, object] | None = None, - memory_map: bool = True, - retries: int = 0, - ) -> Self: - """ - Lazily read from an Arrow IPC (Feather v2) file. - - Use `pl.scan_ipc` to dispatch to this method. - - See Also - -------- - polars.io.scan_ipc - """ - if isinstance(source, (str, Path)): - can_use_fsspec = True - source = normalize_filepath(source) - sources = [] - else: - can_use_fsspec = False - sources = [normalize_filepath(source) for source in source] - source = None # type: ignore[assignment] - - # try fsspec scanner - if can_use_fsspec and not _is_local_file(source): # type: ignore[arg-type] - scan = _scan_ipc_fsspec(source, storage_options) # type: ignore[arg-type] - if n_rows: - scan = scan.head(n_rows) - if row_index_name is not None: - scan = scan.with_row_index(row_index_name, row_index_offset) - return scan # type: ignore[return-value] - - self = cls.__new__(cls) - self._ldf = PyLazyFrame.new_from_ipc( - source, - sources, - n_rows, - cache, - rechunk, - _prepare_row_index_args(row_index_name, row_index_offset), - memory_map=memory_map, - cloud_options=storage_options, - retries=retries, - ) - return self - - @classmethod - def _scan_ndjson( - cls, - source: str | Path | list[str] | list[Path], - *, - infer_schema_length: int | None = N_INFER_DEFAULT, - schema: SchemaDefinition | None = None, - batch_size: int | None = None, - n_rows: int | None = None, - low_memory: bool = False, - rechunk: bool = False, - row_index_name: str | None = None, - row_index_offset: int = 0, - ignore_errors: bool = False, - ) -> Self: - """ - Lazily read from a newline delimited JSON file. - - Use `pl.scan_ndjson` to dispatch to this method. - - See Also - -------- - polars.io.scan_ndjson - """ - if isinstance(source, (str, Path)): - source = normalize_filepath(source) - sources = [] - else: - sources = [normalize_filepath(source) for source in source] - source = None # type: ignore[assignment] - - self = cls.__new__(cls) - self._ldf = PyLazyFrame.new_from_ndjson( - source, - sources, - infer_schema_length, - schema, - batch_size, - n_rows, - low_memory, - rechunk, - _prepare_row_index_args(row_index_name, row_index_offset), - ignore_errors, - ) - return self - @classmethod def _scan_python_function( cls,