Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Dec 14, 2023
1 parent 5b90db4 commit a244340
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 88 deletions.
2 changes: 1 addition & 1 deletion py-polars/polars/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _prepare_file_arg(
file: str | list[str] | TextIO | Path | BinaryIO | bytes,
encoding: str | None = None,
*,
use_pyarrow: bool | None = None,
use_pyarrow: bool = False,
raise_if_empty: bool = True,
**kwargs: Any,
) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]:
Expand Down
201 changes: 114 additions & 87 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO

Expand All @@ -14,42 +15,31 @@
from polars.polars import read_parquet_schema as _read_parquet_schema

if TYPE_CHECKING:
from io import BytesIO

from polars import DataFrame, DataType, LazyFrame
from polars.type_aliases import ParallelStrategy


def read_parquet(
source: str | Path | BinaryIO | BytesIO | bytes,
source: str | Path | list[str] | list[Path] | BinaryIO | BytesIO | bytes,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None,
use_pyarrow: bool = False,
memory_map: bool = True,
storage_options: dict[str, Any] | None = None,
parallel: ParallelStrategy = "auto",
row_count_name: str | None = None,
row_count_offset: int = 0,
low_memory: bool = False,
pyarrow_options: dict[str, Any] | None = None,
parallel: ParallelStrategy = "auto",
use_statistics: bool = True,
hive_partitioning: bool = True,
rechunk: bool = True,
low_memory: bool = False,
storage_options: dict[str, Any] | None = None,
retries: int = 0,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
memory_map: bool = True,
) -> DataFrame:
"""
Read into a DataFrame from a parquet file.
Notes
-----
* Partitioned files:
If you have a directory-nested (hive-style) partitioned dataset, you should
use the :func:`scan_pyarrow_dataset` method instead.
* When benchmarking:
This operation defaults to a `rechunk` operation at the end, meaning that all
data will be stored continuously in memory. Set `rechunk=False` if you are
benchmarking the parquet-reader as `rechunk` can be an expensive operation
that should not contribute to the timings.
Parameters
----------
source
Expand All @@ -62,63 +52,78 @@ def read_parquet(
n_rows
Stop reading from parquet file after reading `n_rows`.
Only valid when `use_pyarrow=False`.
use_pyarrow
Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is
more stable.
memory_map
Memory map underlying file. This will likely increase performance.
Only used when `use_pyarrow=True`.
storage_options
Extra options that make sense for `fsspec.open()` or a particular storage
connection, e.g. host, port, username, password, etc.
parallel : {'auto', 'columns', 'row_groups', 'none'}
This determines the direction of parallelism. 'auto' will try to determine the
optimal direction.
row_count_name
If not None, this will insert a row count column with give name into the
DataFrame.
row_count_offset
Offset to start the row_count column (only use if the name is set).
low_memory
Reduce memory pressure at the expense of performance.
pyarrow_options
Keyword arguments for `pyarrow.parquet.read_table
<https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html>`_.
parallel : {'auto', 'columns', 'row_groups', 'none'}
This determines the direction of parallelism. 'auto' will try to determine the
optimal direction.
use_statistics
Use statistics in the parquet to determine if pages
can be skipped from reading.
hive_partitioning
Infer statistics and schema from hive partitioned URL and use them
to prune reads.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
low_memory
Reduce memory pressure at the expense of performance.
storage_options
Extra options that make sense for `fsspec.open()` or a particular storage
connection, e.g. host, port, username, password, etc.
retries
Number of retries if accessing a cloud instance fails.
use_pyarrow
Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is
more stable.
pyarrow_options
Keyword arguments for `pyarrow.parquet.read_table
<https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html>`_.
memory_map
Memory map underlying file. This will likely increase performance.
Only used when `use_pyarrow=True`.
Returns
-------
DataFrame
See Also
--------
scan_parquet
scan_pyarrow_dataset
Returns
-------
DataFrame
Notes
-----
* Partitioned files:
If you have a directory-nested (hive-style) partitioned dataset, you should
use the :func:`scan_pyarrow_dataset` method instead.
* When benchmarking:
This operation defaults to a `rechunk` operation at the end, meaning that all
data will be stored continuously in memory. Set `rechunk=False` if you are
benchmarking the parquet-reader as `rechunk` can be an expensive operation
that should not contribute to the timings.
"""
if use_pyarrow and n_rows:
raise ValueError("`n_rows` cannot be used with `use_pyarrow=True`")

storage_options = storage_options or {}
pyarrow_options = pyarrow_options or {}

with _prepare_file_arg(
source, use_pyarrow=use_pyarrow, **storage_options
) as source_prep:
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ModuleNotFoundError(
"'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`"
)

import pyarrow as pa
import pyarrow.parquet
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ModuleNotFoundError(
"'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`"
)
if n_rows is not None:
raise ValueError("`n_rows` cannot be used with `use_pyarrow=True`")

import pyarrow as pa
import pyarrow.parquet

pyarrow_options = pyarrow_options or {}

with _prepare_file_arg(
source, use_pyarrow=use_pyarrow, **storage_options
) as source_prep:
return from_arrow( # type: ignore[return-value]
pa.parquet.read_table(
source_prep,
Expand All @@ -128,17 +133,39 @@ def read_parquet(
)
)

return pl.DataFrame._read_parquet(
source_prep,
columns=columns,
n_rows=n_rows,
parallel=parallel,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
low_memory=low_memory,
use_statistics=use_statistics,
rechunk=rechunk,
)
if isinstance(source, (BinaryIO, BytesIO, bytes)):
with _prepare_file_arg(
source, use_pyarrow=use_pyarrow, **storage_options
) as source_prep:
return pl.DataFrame._read_parquet(
source_prep,
columns=columns,
n_rows=n_rows,
parallel=parallel,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
low_memory=low_memory,
use_statistics=use_statistics,
rechunk=rechunk,
)

lf = scan_parquet(
source,
n_rows=n_rows,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
parallel=parallel,
use_statistics=use_statistics,
hive_partitioning=hive_partitioning,
rechunk=rechunk,
low_memory=low_memory,
storage_options=storage_options,
retries=retries,
)
if columns is not None:
lf = lf.select(columns) # TODO: Handle int columns

return lf.collect()


def read_parquet_schema(
Expand Down Expand Up @@ -170,15 +197,15 @@ def scan_parquet(
source: str | Path | list[str] | list[Path],
*,
n_rows: int | None = None,
cache: bool = True,
parallel: ParallelStrategy = "auto",
rechunk: bool = True,
row_count_name: str | None = None,
row_count_offset: int = 0,
storage_options: dict[str, Any] | None = None,
low_memory: bool = False,
parallel: ParallelStrategy = "auto",
use_statistics: bool = True,
hive_partitioning: bool = True,
rechunk: bool = True,
low_memory: bool = False,
cache: bool = True,
storage_options: dict[str, Any] | None = None,
retries: int = 0,
) -> LazyFrame:
"""
Expand All @@ -194,19 +221,27 @@ def scan_parquet(
If a single path is given, it can be a globbing pattern.
n_rows
Stop reading from parquet file after reading `n_rows`.
cache
Cache the result after reading.
row_count_name
If not None, this will insert a row count column with the given name into the
DataFrame
row_count_offset
Offset to start the row_count column (only used if the name is set)
parallel : {'auto', 'columns', 'row_groups', 'none'}
This determines the direction of parallelism. 'auto' will try to determine the
optimal direction.
use_statistics
Use statistics in the parquet to determine if pages
can be skipped from reading.
hive_partitioning
Infer statistics and schema from hive partitioned URL and use them
to prune reads.
rechunk
In case of reading multiple files via a glob pattern rechunk the final DataFrame
into contiguous memory chunks.
row_count_name
If not None, this will insert a row count column with give name into the
DataFrame
row_count_offset
Offset to start the row_count column (only use if the name is set)
low_memory
Reduce memory pressure at the expense of performance.
cache
Cache the result after reading.
storage_options
Options that inform use how to connect to the cloud provider.
If the cloud provider is not supported by us, the storage options
Expand All @@ -220,14 +255,6 @@ def scan_parquet(
If `storage_options` are not provided we will try to infer them from the
environment variables.
low_memory
Reduce memory pressure at the expense of performance.
use_statistics
Use statistics in the parquet to determine if pages
can be skipped from reading.
hive_partitioning
Infer statistics and schema from hive partitioned URL and use them
to prune reads.
retries
Number of retries if accessing a cloud instance fails.
Expand Down

0 comments on commit a244340

Please sign in to comment.