Skip to content

Commit

Permalink
Add/expose columns/projection/stop_after_n_rows for pl.read_ipc() and…
Browse files Browse the repository at this point in the history
… pl.read_parquet().
  • Loading branch information
ghuls authored and ritchie46 committed Nov 14, 2021
1 parent cdc3ace commit 3855a4a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 21 deletions.
14 changes: 12 additions & 2 deletions py-polars/polars/eager/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,11 @@ def read_csv(
skip_rows
Start reading after `skip_rows`.
projection
Indexes of columns to select. Note that column indexes count from zero.
Indices of columns to select. Note that column indices start at zero.
sep
Character to use as delimiter in the file.
columns
Columns to project/ select.
Columns to select.
rechunk
Make sure that all columns are contiguous in memory by aggregating the chunks into a single array.
encoding
Expand Down Expand Up @@ -497,6 +497,10 @@ def read_parquet(
----------
file
Path to a file or a file like object. Any valid filepath can be used.
columns
Columns to select.
projection
Indices of columns to select. Note that column indices start at zero.
stop_after_n_rows
Only read specified number of rows of the dataset. After `n` stops reading.
"""
Expand All @@ -520,6 +524,12 @@ def read_ipc(
----------
file
Path to a file or a file like object.
columns
Columns to select.
projection
Indices of columns to select. Note that column indices start at zero.
stop_after_n_rows
Only read specified number of rows of the dataset. After `n` stops reading.
Returns
-------
Expand Down
74 changes: 57 additions & 17 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ def read_csv(
skip_rows
Start reading after `skip_rows`.
projection
Indexes of columns to select. Note that column indexes count from zero.
Indices of columns to select. Note that column indices start at zero.
sep
Delimiter/ value separator.
columns
Columns to project/ select.
Columns to select.
rechunk
Make sure that all columns are contiguous in memory by aggregating the chunks into a single array.
encoding
Expand Down Expand Up @@ -543,9 +543,11 @@ def read_ipc_schema(

def read_ipc(
file: Union[str, BinaryIO, Path, bytes],
columns: Optional[List[str]] = None,
projection: Optional[List[int]] = None,
stop_after_n_rows: Optional[int] = None,
use_pyarrow: bool = _PYARROW_AVAILABLE,
memory_map: bool = True,
columns: Optional[List[str]] = None,
storage_options: Optional[Dict] = None,
) -> "pl.DataFrame":
"""
Expand All @@ -556,39 +558,63 @@ def read_ipc(
file
Path to a file or a file like object.
If ``fsspec`` is installed, it will be used to open remote files
columns
Columns to select.
projection
Indices of columns to select. Note that column indices start at zero.
stop_after_n_rows
Only read specified number of rows of the dataset. After `n` stops reading.
use_pyarrow
Use pyarrow or the native rust reader.
memory_map
Memory map underlying file. This will likely increase performance.
Only used when 'use_pyarrow=True'
columns
Columns to project/ select.
Only valid when 'use_pyarrow=True'
storage_options
Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc.
Returns
-------
DataFrame
"""
if use_pyarrow:
if stop_after_n_rows:
raise ValueError(
"'stop_after_n_rows' cannot be used with 'use_pyarrow=True'."
)

storage_options = storage_options or {}
with _prepare_file_arg(file, **storage_options) as data:
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ImportError(
"'pyarrow' is required when using 'read_ipc(..., use_pyarrow=True)'."
)
tbl = pa.feather.read_table(data, memory_map=memory_map, columns=columns)

# pyarrow accepts column names or column indices.
tbl = pa.feather.read_table(
data, memory_map=memory_map, columns=columns if columns else projection
)
return pl.DataFrame._from_arrow(tbl)
return pl.DataFrame.read_ipc(data, columns=columns)

if columns:
# Unset projection if column names where specified.
projection = None

return pl.DataFrame.read_ipc(
data,
columns=columns,
projection=projection,
stop_after_n_rows=stop_after_n_rows,
)


def read_parquet(
source: Union[str, List[str], Path, BinaryIO, bytes],
use_pyarrow: bool = _PYARROW_AVAILABLE,
columns: Optional[List[str]] = None,
projection: Optional[List[int]] = None,
stop_after_n_rows: Optional[int] = None,
use_pyarrow: bool = _PYARROW_AVAILABLE,
memory_map: bool = True,
columns: Optional[List[str]] = None,
storage_options: Optional[Dict] = None,
**kwargs: Any,
) -> "pl.DataFrame":
Expand All @@ -601,17 +627,18 @@ def read_parquet(
Path to a file, list of files, or a file like object. If the path is a directory, that directory will be used
as partition aware scan.
If ``fsspec`` is installed, it will be used to open remote files
use_pyarrow
Use pyarrow instead of the rust native parquet reader. The pyarrow reader is more stable.
columns
Columns to select.
projection
Indices of columns to select. Note that column indices start at zero.
stop_after_n_rows
After n rows are read from the parquet, it stops reading.
Only valid when 'use_pyarrow=False'
use_pyarrow
Use pyarrow instead of the rust native parquet reader. The pyarrow reader is more stable.
memory_map
Memory map underlying file. This will likely increase performance.
Only used when 'use_pyarrow=True'
columns
Columns to project/ select.
Only valid when 'use_pyarrow=True'
storage_options
Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc.
**kwargs
Expand All @@ -634,13 +661,26 @@ def read_parquet(
raise ImportError(
"'pyarrow' is required when using 'read_parquet(..., use_pyarrow=True)'."
)

# pyarrow accepts column names or column indices.
return from_arrow( # type: ignore[return-value]
pa.parquet.read_table(
source_prep, memory_map=memory_map, columns=columns, **kwargs
source_prep,
memory_map=memory_map,
columns=columns if columns else projection,
**kwargs,
)
)

if columns:
# Unset projection if column names where specified.
projection = None

return pl.DataFrame.read_parquet(
source_prep, stop_after_n_rows=stop_after_n_rows, columns=columns
source_prep,
columns=columns,
projection=projection,
stop_after_n_rows=stop_after_n_rows,
)


Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ impl PyDataFrame {
Py(f) => {
let buf = f.as_buffer();
ParquetReader::new(buf)
.with_projection(projection)
.with_columns(columns)
.with_stop_after_n_rows(stop_after_n_rows)
.with_stop_after_n_rows(stop_after_n_rows)
.finish()
}
Rust(f) => ParquetReader::new(f)
Expand Down
13 changes: 12 additions & 1 deletion py-polars/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_to_from_buffer(df):
assert df.frame_equal(df_1, null_equal=True)


def test_select_columns_from_buffer():
def test_select_columns_and_projection_from_buffer():
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
for to_fn, from_fn in zip(
Expand All @@ -37,9 +37,20 @@ def test_select_columns_from_buffer():
f = io.BytesIO()
to_fn(f)
f.seek(0)

df_1 = from_fn(f, columns=["b", "c"], use_pyarrow=False)
assert df_1.frame_equal(expected)

for to_fn, from_fn in zip(
[df.to_parquet, df.to_ipc], [pl.read_parquet, pl.read_ipc]
):
f = io.BytesIO()
to_fn(f)
f.seek(0)

df_2 = from_fn(f, projection=[1, 2], use_pyarrow=False)
assert df_2.frame_equal(expected)


def test_read_web_file():
url = "https://raw.githubusercontent.com/pola-rs/polars/master/examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv"
Expand Down

0 comments on commit 3855a4a

Please sign in to comment.