Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] }
pyo3-file = "0.12"
pyo3-object_store = "0.2"
thiserror = "1.0.63"
tokio = { version = "1.40", features = [
"macros",
"rt",
"rt-multi-thread",
"sync",
] }

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions arro3-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ pyo3-async-runtimes = { workspace = true, features = [
pyo3-file = { workspace = true }
pyo3-object_store = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
8 changes: 8 additions & 0 deletions arro3-io/python/arro3/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from . import _io
from ._io import *
from ._io import ___version, store

__version__: str = ___version()

__all__ = [
"__version__",
# "exceptions",
"store",
]
__all__ += _io.__all__
22 changes: 21 additions & 1 deletion arro3-io/python/arro3/io/_io.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
from ._csv import infer_csv_schema, read_csv, write_csv
from ._ipc import read_ipc, read_ipc_stream, write_ipc, write_ipc_stream
from ._json import infer_json_schema, read_json, write_json, write_ndjson
from ._parquet import read_parquet, read_parquet_async, write_parquet
from ._parquet import (
ParquetColumnPath,
ParquetCompression,
ParquetEncoding,
ParquetFile,
ParquetOpenOptions,
ParquetPredicate,
ParquetReadOptions,
read_parquet,
read_parquet_async,
write_parquet,
)

__all__ = [
"infer_csv_schema",
Expand All @@ -18,4 +29,13 @@ __all__ = [
"read_parquet",
"read_parquet_async",
"write_parquet",
"ParquetColumnPath",
"ParquetCompression",
"ParquetEncoding",
"ParquetFile",
"ParquetOpenOptions",
"ParquetPredicate",
"ParquetReadOptions",
]

def ___version() -> str: ...
225 changes: 224 additions & 1 deletion arro3-io/python/arro3/io/_parquet.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import sys
from pathlib import Path
from typing import IO, Literal, Sequence
from typing import IO, Literal, Protocol, Sequence, TypedDict

# Note: importing with
# `from arro3.core import Array`
# will cause Array to be included in the generated docs in this module.
import arro3.core as core
import arro3.core.types as types
from obstore.store import ObjectStore as ObstoreStore

from ._pyo3_object_store import ObjectStore
from ._stream import RecordBatchStream

if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack

ParquetColumnPath = str | Sequence[str]
"""Allowed types to refer to a Parquet Column."""
Expand Down Expand Up @@ -51,6 +59,221 @@ async def read_parquet_async(path: str, *, store: ObjectStore) -> core.Table:
The loaded Arrow data.
"""

class ParquetPredicate(Protocol):
"""A predicate operating on RecordBatch.

See RowFilter for more information on the use of this trait.
"""

@property
def columns(self) -> Sequence[str]:
"""Returns the columns required to evaluate this predicate.

All projected columns will be provided in the batch passed to evaluate.
"""
def evaluate(self, batch: core.RecordBatch) -> types.ArrowArrayExportable:
"""Evaluate this predicate for the given `RecordBatch`.

Only the columns identified by `Self.columns` will be provided in the batch.

Must return a boolean-typed `Array` that has the same length as the input batch
where each row indicates whether the row should be returned:

- `True`: the row should be returned
- `False` or `null`: the row should not be returned
"""

class ParquetOpenOptions(TypedDict, total=False):
"""Options passed when opening `ParquetFile`."""

store: ObjectStore | ObstoreStore | None
"""
A store to use when opening Parquet files from cloud storage.
"""

skip_arrow_metadata: bool
"""If `True`, skip decoding the embedded arrow metadata.

Parquet files generated by some writers may contain embedded arrow schema and
metadata. This may not be correct or compatible with your system, for example:
[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184).
"""

schema: core.Schema | None
"""Provide a schema to use when reading the Parquet file.

If provided it takes precedence over the schema inferred from the file or the schema
defined in the file's metadata. If the schema is not compatible with the file's
schema an error will be returned when constructing the builder.

This option is only required if you want to cast columns to a different type. For
example, if you wanted to cast from an `Int64` in the Parquet file to a `Timestamp`
in the Arrow schema.

The supplied schema must have the same number of columns as the Parquet schema and
the column names need to be the same.
"""

page_index: bool
"""Enable reading [`PageIndex`], if present.

The `PageIndex` can be used to push down predicates to the parquet scan, potentially
eliminating unnecessary IO, by some query engines.

[PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
"""

class ParquetReadOptions(TypedDict, total=False):
"""Options passed to read calls of `ParquetFile`."""

batch_size: int | None
"""Set the size of `RecordBatch` to produce.
"""
row_groups: Sequence[int] | None
"""Only read data from the provided row group indexes.

This is also called row group filtering.
"""
columns: Sequence[str] | None
"""Only read data from the provided column indexes.
"""
filters: ParquetPredicate | Sequence[ParquetPredicate] | None
"""Provide one or more filters to skip decoding rows.

These filters are applied during the parquet read process **after** row group
selection. <!-- and row selection. -->

Filters are applied in order after decoding only the columns
required. As filters eliminate rows, fewer rows from subsequent columns
may be required, thus potentially reducing IO and decode.

`filters` consists of one or more [`ParquetPredicate`s][arro3.io.ParquetPredicate].
Only the rows for which all the predicates evaluate to `true` will be returned.

<!--
Any [`RowSelection`] provided to the reader will be applied prior to the first
predicate, and each predicate in turn will then be used to compute a more refined
[`RowSelection`] used when evaluating the subsequent predicates.

Once all predicates have been evaluated, the final [`RowSelection`] is applied
to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
-->

This design has a couple of implications:

- filters can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
- Columns may be decoded multiple times if they appear in multiple predicates
- IO will be deferred until needed by columns required by a predicate.

As such there is a trade-off between a single large predicate, or multiple
predicates, that will depend on the shape of the data. Whilst multiple smaller
predicates may minimise the amount of data scanned/decoded, it may not be faster
overall.

For example, if a predicate that needs a single column of data filters out all but
1% of the rows, applying it as one of the early predicates will likely significantly
improve performance.

As a counter example, if a predicate needs several columns of data to evaluate but
leaves 99% of the rows, it may be better to not filter the data from parquet and
apply the filter after the `RecordBatch` has been fully decoded.

Additionally, even if a predicate eliminates a moderate number of rows, it may still
be faster to filter the data after the RecordBatch has been fully decoded, if the
eliminated rows are not contiguous.

!!! note

It is recommended to enable reading the page index if using this functionality,
to allow more efficient skipping over data pages. See
[`ParquetOpenOptions.page_index`][arro3.io.ParquetOpenOptions.page_index].
"""

limit: int | None
"""Provide a limit to the number of rows to be read.

<!-- row selection. -->

The limit will be applied after any `filters` allowing it to limit the final set of
rows decoded after any pushed down predicates.

!!! note

It is recommended to enable reading the page index if using this functionality,
to allow more efficient skipping over data pages. See
[`ParquetOpenOptions.page_index`][arro3.io.ParquetOpenOptions.page_index].
"""
offset: int | None
"""Provide an offset to skip over the given number of rows.

The offset will be applied after any `filters` allowing it to skip rows after any
pushed down predicates.

!!! note

It is recommended to enable reading the page index if using this functionality,
to allow more efficient skipping over data pages. See
[`ParquetOpenOptions.page_index`][arro3.io.ParquetOpenOptions.page_index].
"""

class ParquetFile:
@classmethod
def open(
cls,
file: IO[bytes] | Path | str,
**kwargs: Unpack[ParquetOpenOptions],
) -> ParquetFile:
"""Open a Parquet file."""

@classmethod
async def open_async(
cls,
file: IO[bytes] | Path | str,
*,
store: ObjectStore | ObstoreStore | None = None,
skip_arrow_metadata: bool = False,
schema: core.Schema | None = None,
page_index: bool = False,
) -> ParquetFile:
"""Open a Parquet file."""

@property
def num_row_groups(self) -> int:
"""Return the number of row groups in the Parquet file."""

def read(self, **kwargs: Unpack[ParquetReadOptions]) -> core.RecordBatchReader:
"""Read the Parquet file to an Arrow RecordBatchReader.

Keyword Args:
batch_size: The number of rows to read in each batch.
row_groups: The row groups to read.
columns: The columns to read.
limit: The number of rows to read.
offset: The number of rows to skip.

Returns:
The loaded Arrow data.
"""
def read_async(self, **kwargs: Unpack[ParquetReadOptions]) -> RecordBatchStream:
"""Read the Parquet file to an Arrow async RecordBatchStream.

Note that this method itself is **not async**, but returns an async iterable
that yields RecordBatches.

Keyword Args:
batch_size: The number of rows to read in each batch.
row_groups: The row groups to read.
columns: The columns to read.
limit: The number of rows to read.
offset: The number of rows to skip.

Returns:
The loaded Arrow data.
"""
@property
def schema_arrow(self) -> core.Schema:
"""Return the Arrow schema of the Parquet file."""

def write_parquet(
data: types.ArrowStreamExportable | types.ArrowArrayExportable,
file: IO[bytes] | Path | str,
Expand Down
12 changes: 12 additions & 0 deletions arro3-io/python/arro3/io/_stream.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Note: importing with
# `from arro3.core import Array`
# will cause Array to be included in the generated docs in this module.
import arro3.core as core

class RecordBatchStream:
def __aiter__(self) -> RecordBatchStream:
"""Return `Self` as an async iterator."""
async def __anext__(self) -> core.RecordBatch:
"""Return the next record batch in the stream."""
async def collect_async(self) -> core.Table:
"""Collect the stream into a single table."""
6 changes: 3 additions & 3 deletions arro3-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pyo3_arrow::export::{Arro3RecordBatchReader, Arro3Schema};
use pyo3_arrow::input::AnyRecordBatch;
use pyo3_arrow::{PyRecordBatchReader, PySchema};

use crate::utils::{FileReader, FileWriter};
use crate::source::{FileWriter, SyncReader};

/// Infer a CSV file's schema
#[pyfunction]
Expand All @@ -25,7 +25,7 @@ use crate::utils::{FileReader, FileWriter};
))]
#[allow(clippy::too_many_arguments)]
pub fn infer_csv_schema(
file: FileReader,
file: SyncReader,
has_header: Option<bool>,
max_records: Option<usize>,
delimiter: Option<char>,
Expand Down Expand Up @@ -76,7 +76,7 @@ pub fn infer_csv_schema(
))]
#[allow(clippy::too_many_arguments)]
pub fn read_csv(
file: FileReader,
file: SyncReader,
schema: PySchema,
has_header: Option<bool>,
batch_size: Option<usize>,
Expand Down
9 changes: 7 additions & 2 deletions arro3-io/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Contains the [`Arro3IoError`], the Error returned by most fallible functions in this crate.

use pyo3::exceptions::{PyException, PyValueError};
use pyo3::exceptions::{PyException, PyIOError, PyValueError};
use pyo3::prelude::*;
use pyo3::DowncastError;
use thiserror::Error;
Expand All @@ -13,6 +13,10 @@ pub enum Arro3IoError {
#[error(transparent)]
ArrowError(#[from] arrow_schema::ArrowError),

/// A wrapped [std::io::Error]
#[error(transparent)]
IOError(#[from] std::io::Error),

/// A wrapped [object_store::Error]
#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),
Expand All @@ -29,10 +33,11 @@ pub enum Arro3IoError {
impl From<Arro3IoError> for PyErr {
fn from(error: Arro3IoError) -> Self {
match error {
Arro3IoError::PyErr(err) => err,
Arro3IoError::ArrowError(err) => PyException::new_err(err.to_string()),
Arro3IoError::IOError(err) => PyIOError::new_err(err.to_string()),
Arro3IoError::ObjectStoreError(err) => PyException::new_err(err.to_string()),
Arro3IoError::ParquetError(err) => PyException::new_err(err.to_string()),
Arro3IoError::PyErr(err) => err,
}
}
}
Expand Down
Loading
Loading