Skip to content

Commit

Permalink
chore(rust,python) Change allow_streaming to streaming (#5747)
Browse files Browse the repository at this point in the history
Co-authored-by: Liam Brannigan <l.brannigan@analyticsengines.com>
  • Loading branch information
braaannigan and Liam Brannigan committed Dec 8, 2022
1 parent 48d415e commit 4fa0b01
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 17 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ Polars is also very lightweight. It comes with zero required dependencies, and t
### Handles larger than RAM data

If you have data that does not fit into memory, polars lazy is able to process your query (or parts of your query) in a
streaming fashion, this drastically reduces memory requirements you might be able to process your 250GB dataset on your
laptop. Collect with `collect(allow_streaming=True)` to run the query streaming. (This might be a little slower, but
streaming fashion, this drastically reduces memory requirements so you might be able to process your 250GB dataset on your
laptop. Collect with `collect(streaming=True)` to run the query streaming. (This might be a little slower, but
it is still very fast!)

## Setup
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl DataFrame {
let n_rows_right = other.height() as IdxSize;
let Some(total_rows) = n_rows_left.checked_mul(n_rows_right) else {
return Err(PolarsError::ComputeError("Cross joins would produce more rows than fits into 2^32.\n\
Consider comping with polars-big-idx feature, or set 'allow_streaming'.".into()))
Consider comping with polars-big-idx feature, or set 'streaming'.".into()))
};

// the left side has the Nth row combined with every row from right.
Expand Down
8 changes: 5 additions & 3 deletions py-polars/polars/internals/lazy_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_datetime_to_pl_timestamp,
_time_to_pl_time,
_timedelta_to_pl_timedelta,
deprecated_alias,
)

try:
Expand Down Expand Up @@ -2184,6 +2185,7 @@ def concat_list(exprs: Sequence[str | pli.Expr | pli.Series] | pli.Expr) -> pli.
return pli.wrap_expr(_concat_lst(exprs))


@deprecated_alias(allow_streaming="streaming")
def collect_all(
lazy_frames: Sequence[pli.LazyFrame],
type_coercion: bool = True,
Expand All @@ -2194,7 +2196,7 @@ def collect_all(
no_optimization: bool = False,
slice_pushdown: bool = True,
common_subplan_elimination: bool = True,
allow_streaming: bool = False,
streaming: bool = False,
) -> list[pli.DataFrame]:
"""
Collect multiple LazyFrames at the same time.
Expand All @@ -2221,7 +2223,7 @@ def collect_all(
Slice pushdown optimization.
common_subplan_elimination
Will try to cache branching subplans that occur on self-joins or unions.
allow_streaming
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Returns
Expand All @@ -2245,7 +2247,7 @@ def collect_all(
simplify_expression,
slice_pushdown,
common_subplan_elimination,
allow_streaming,
streaming,
)
prepared.append(ldf)

Expand Down
14 changes: 8 additions & 6 deletions py-polars/polars/internals/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ def sort(
by = pli.selection_to_pyexpr_list(by)
return self._from_pyldf(self._ldf.sort_by_exprs(by, reverse, nulls_last))

@deprecated_alias(allow_streaming="streaming")
def profile(
self,
type_coercion: bool = True,
Expand All @@ -939,7 +940,7 @@ def profile(
show_plot: bool = False,
truncate_nodes: int = 0,
figsize: tuple[int, int] = (18, 8),
allow_streaming: bool = False,
streaming: bool = False,
) -> tuple[pli.DataFrame, pli.DataFrame]:
"""
Profile a LazyFrame.
Expand Down Expand Up @@ -973,7 +974,7 @@ def profile(
characters.
figsize
matplotlib figsize of the profiling plot
allow_streaming
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Returns
Expand Down Expand Up @@ -1029,7 +1030,7 @@ def profile(
simplify_expression,
slice_pushdown,
common_subplan_elimination,
allow_streaming,
streaming,
)
df, timings = ldf.profile()
(df, timings) = pli.wrap_df(df), pli.wrap_df(timings)
Expand Down Expand Up @@ -1163,6 +1164,7 @@ def collect(
)
return pli.wrap_df(ldf.collect())

@deprecated_alias(allow_streaming="streaming")
def fetch(
self,
n_rows: int = 500,
Expand All @@ -1174,7 +1176,7 @@ def fetch(
no_optimization: bool = False,
slice_pushdown: bool = True,
common_subplan_elimination: bool = True,
allow_streaming: bool = False,
streaming: bool = False,
) -> pli.DataFrame:
"""
Collect a small number of rows for debugging purposes.
Expand Down Expand Up @@ -1205,7 +1207,7 @@ def fetch(
Slice pushdown optimization
common_subplan_elimination
Will try to cache branching subplans that occur on self-joins or unions.
allow_streaming
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Returns
Expand Down Expand Up @@ -1247,7 +1249,7 @@ def fetch(
simplify_expression,
slice_pushdown,
common_subplan_elimination,
allow_streaming,
streaming,
)
return pli.wrap_df(ldf.fetch(n_rows))

Expand Down
4 changes: 2 additions & 2 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl PyLazyFrame {
simplify_expr: bool,
slice_pushdown: bool,
cse: bool,
allow_streaming: bool,
streaming: bool,
) -> PyLazyFrame {
let ldf = self.ldf.clone();
let ldf = ldf
Expand All @@ -359,7 +359,7 @@ impl PyLazyFrame {
.with_simplify_expr(simplify_expr)
.with_slice_pushdown(slice_pushdown)
.with_common_subplan_elimination(cse)
.with_streaming(allow_streaming)
.with_streaming(streaming)
.with_projection_pushdown(projection_pushdown);
ldf.into()
}
Expand Down
4 changes: 2 additions & 2 deletions py-polars/tests/unit/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ def test_streaming_joins() -> None:
dfa_pl.lazy()
.join(dfb_pl.lazy(), on="a", how=how)
.sort(["a", "b"])
.collect(allow_streaming=True)
.collect(streaming=True)
)

a = pl.from_pandas(pd_result).with_column(pl.all().cast(int)).sort(["a", "b"])
Expand All @@ -723,7 +723,7 @@ def test_streaming_joins() -> None:
dfa_pl.lazy()
.join(dfb_pl.lazy(), on=["a", "b"], how=how)
.sort(["a", "b"])
.collect(allow_streaming=True)
.collect(streaming=True)
)

# we cast to integer because pandas joins creates floats
Expand Down
2 changes: 1 addition & 1 deletion py-polars/tests/unit/test_predicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_streaming_empty_df() -> None:

assert df.lazy().join(df.lazy(), on="a", how="inner").filter(
2 == 1 # noqa: SIM300
).collect(allow_streaming=True).to_dict(False) == {"a": [], "b": [], "b_right": []}
).collect(streaming=True).to_dict(False) == {"a": [], "b": [], "b_right": []}


def test_when_then_empty_list_5547() -> None:
Expand Down

0 comments on commit 4fa0b01

Please sign in to comment.