Skip to content

Commit

Permalink
perf(python): take advantage of sorted join for frame alignment (#5106)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-beedie committed Oct 5, 2022
1 parent b0990b0 commit 31984dd
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 52 deletions.
94 changes: 43 additions & 51 deletions py-polars/polars/internals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def align_frames(
*frames: pli.DataFrame,
on: str | pli.Expr | Sequence[str] | Sequence[pli.Expr] | Sequence[str | pli.Expr],
select: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
reverse: bool | Sequence[bool] = False,
) -> list[pli.DataFrame]:
...

Expand All @@ -396,6 +397,7 @@ def align_frames(
*frames: pli.LazyFrame,
on: str | pli.Expr | Sequence[str] | Sequence[pli.Expr] | Sequence[str | pli.Expr],
select: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
reverse: bool | Sequence[bool] = False,
) -> list[pli.LazyFrame]:
...

Expand All @@ -404,9 +406,10 @@ def align_frames(
*frames: pli.DataFrame | pli.LazyFrame,
on: str | pli.Expr | Sequence[str] | Sequence[pli.Expr] | Sequence[str | pli.Expr],
select: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
reverse: bool | Sequence[bool] = False,
) -> list[pli.DataFrame] | list[pli.LazyFrame]:
"""
Align a sequence of frames using the values from one or more columns as a key.
r"""
Align a sequence of frames using the uique values from one or more columns as a key.
Frames that do not contain the given key values have rows injected (with nulls
filling the non-key columns), and each resulting frame is sorted by the key.
Expand All @@ -423,10 +426,13 @@ def align_frames(
frames
sequence of DataFrames or LazyFrames.
on
one or more columns whose values will be used to align the frames.
one or more columns whose unique values will be used to align the frames.
select
optional post-alignment column select to constrain and/or order
the columns returned from the newly aligned frames.
reverse
sort the alignment column values in descending order; can be a single
boolean or a list of booleans associated with each column in ``on``.
Examples
--------
Expand Down Expand Up @@ -459,14 +465,31 @@ def align_frames(
# │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │
# │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │
# ╞════════════╪═════╪══════╡ ╞════════════╪═════╪══════╡ ╞════════════╪═════╪═════╡
# │ 2022-09-01 ┆ 3.5 ┆ 10.0 │ │ 2022-09-02 ┆ 8.0 ┆ 1.5 │ │ 2022-09-03 ┆ 2.0 ┆ 2.5 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌┤
# │ 2022-09-02 ┆ 4.0 ┆ 2.5 │ │ 2022-09-03 ┆ 1.0 ┆ 12.0 │ │ 2022-09-02 ┆ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ └────────────┴─────┴─────┘
# │ 2022-09-03 ┆ 1.0 ┆ 1.5 │ │ 2022-09-01 ┆ 3.5 ┆ 5.0 │
# │ 2022-09-01 ┆ 3.5 ┆ 10.0 │\ ,->│ 2022-09-02 ┆ 8.0 ┆ 1.5 │\ ,->│ 2022-09-03 ┆ 2.0 ┆ 2.5 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ \/ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ \/ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌┤
# │ 2022-09-02 ┆ 4.0 ┆ 2.5 │_/\,->│ 2022-09-03 ┆ 1.0 ┆ 12.0 │_/`-->│ 2022-09-02 ┆ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ /\ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ └────────────┴─────┴─────┘
# │ 2022-09-03 ┆ 1.0 ┆ 1.5 │_/ `>│ 2022-09-01 ┆ 3.5 ┆ 5.0 │-//-
# └────────────┴─────┴──────┘ └────────────┴─────┴──────┘
>>> # align frames on the values in "dt", but keep only cols "x" and "y":
>>> # align frames by the "dt" column:
>>> af1, af2, af3 = pl.align_frames(df1, df2, df3, on="dt")
# df1 df2 df3
# shape: (3, 3) shape: (3, 3) shape: (3, 3)
# ┌────────────┬─────┬──────┐ ┌────────────┬─────┬──────┐ ┌────────────┬──────┬──────┐
# │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │
# │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │
# │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │
# ╞════════════╪═════╪══════╡ ╞════════════╪═════╪══════╡ ╞════════════╪══════╪══════╡
# │ 2022-09-01 ┆ 3.5 ┆ 10.0 │----->│ 2022-09-01 ┆ 3.5 ┆ 5.0 │----->│ 2022-09-01 ┆ null ┆ null │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 2022-09-02 ┆ 4.0 ┆ 2.5 │----->│ 2022-09-02 ┆ 8.0 ┆ 1.5 │----->│ 2022-09-02 ┆ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 2022-09-03 ┆ 1.0 ┆ 1.5 │----->│ 2022-09-03 ┆ 1.0 ┆ 12.0 │----->│ 2022-09-03 ┆ 2.0 ┆ 2.5 │
# └────────────┴─────┴──────┘ └────────────┴─────┴──────┘ └────────────┴──────┴──────┘
>>> # align frames by "dt", but keep only cols "x" and "y":
>>> af1, af2, af3 = pl.align_frames(df1, df2, df3, on="dt", select=["x", "y"])
# af1 af2 af3
Expand All @@ -483,7 +506,7 @@ def align_frames(
# │ 1.0 ┆ 1.5 │ │ 1.0 ┆ 12.0 │ │ 2.0 ┆ 2.5 │
# └─────┴──────┘ └─────┴──────┘ └──────┴──────┘
>>> # now frames are aligned, can easily calculate the row-wise dot product:
>>> # now data is aligned, can easily calculate the row-wise dot product:
>>> (af1 * af2 * af3).fill_null(0).select(pl.sum(pl.col("*")).alias("dot"))
shape: (3, 1)
┌───────┐
Expand All @@ -498,39 +521,6 @@ def align_frames(
│ 47.0 │
└───────┘
>>> # as above, but keeping the alignment column ("dt") in the final frame:
>>> af1, af2, af3 = pl.align_frames(df1, df2, df3, on="dt")
# af1 af2 af3
# shape: (3, 3) shape: (3, 3) shape: (3, 3)
# ┌────────────┬─────┬──────┐ ┌────────────┬─────┬──────┐ ┌────────────┬──────┬──────┐
# │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │
# │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │
# │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │
# ╞════════════╪═════╪══════╡ ╞════════════╪═════╪══════╡ ╞════════════╪══════╪══════╡
# │ 2022-09-01 ┆ 3.5 ┆ 10.0 │ │ 2022-09-01 ┆ 3.5 ┆ 5.0 │ │ 2022-09-01 ┆ null ┆ null │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 2022-09-02 ┆ 4.0 ┆ 2.5 │ │ 2022-09-02 ┆ 8.0 ┆ 1.5 │ │ 2022-09-02 ┆ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 2022-09-03 ┆ 1.0 ┆ 1.5 │ │ 2022-09-03 ┆ 1.0 ┆ 12.0 │ │ 2022-09-03 ┆ 2.0 ┆ 2.5 │
# └────────────┴─────┴──────┘ └────────────┴─────┴──────┘ └────────────┴──────┴──────┘
>>> (af1[["x", "y"]] * af2[["x", "y"]] * af3[["x", "y"]]).fill_null(0).select(
... pl.sum(pl.col("*")).alias("dot")
... ).insert_at_idx(0, af1["dt"])
shape: (3, 2)
┌────────────┬───────┐
│ dt ┆ dot │
│ --- ┆ --- │
│ date ┆ f64 │
╞════════════╪═══════╡
│ 2022-09-01 ┆ 0.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2022-09-02 ┆ 167.5 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2022-09-03 ┆ 47.0 │
└────────────┴───────┘
""" # noqa: E501
if not frames:
return [] # type: ignore[return-value]
Expand All @@ -539,21 +529,23 @@ def align_frames(
"Input frames must be of a consistent type (all LazyFrame or all DataFrame)"
)

# establish the superset of all "on" column values, sort, and cache
eager = isinstance(frames[0], pli.DataFrame)
alignment_frame = concat([df.lazy().select(on) for df in frames]).unique(
maintain_order=False
alignment_frame = (
concat([df.lazy().select(on) for df in frames])
.unique(maintain_order=False)
.sort(by=on, reverse=reverse)
)
if eager: # collect once, outside the alignment joins
alignment_frame = alignment_frame.collect().lazy()

alignment_frame = (
alignment_frame.collect().lazy() if eager else alignment_frame.cache()
)
# finally, align all frames
aligned_frames = [
alignment_frame.join(
other=df.lazy(),
on=alignment_frame.columns,
how="left",
)
.select(df.columns)
.sort(by=on)
).select(df.columns)
for df in frames
]
if select is not None:
Expand Down
11 changes: 10 additions & 1 deletion py-polars/tests/unit/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ def test_align_frames() -> None:
pl.from_pandas(df2.reset_index()),
on="date",
)
# (note: feels like we should be able to streamline dot-product further)
pl_dot = (
(pf1[["a", "b"]] * pf2[["a", "b"]])
.fill_null(0)
Expand Down Expand Up @@ -200,6 +199,16 @@ def test_align_frames() -> None:
on="date",
)

# reverse
pf1, pf2 = pl.align_frames(
pl.DataFrame([[3, 5, 6], [5, 8, 9]], orient="row"),
pl.DataFrame([[2, 5, 6], [3, 8, 9], [4, 2, 0]], orient="row"),
on="column_0",
reverse=True,
)
assert pf1.rows() == [(5, 8, 9), (4, None, None), (3, 5, 6), (2, None, None)]
assert pf2.rows() == [(5, None, None), (4, 2, 0), (3, 8, 9), (2, 5, 6)]


def test_nan_aggregations() -> None:
df = pl.DataFrame({"a": [1.0, float("nan"), 2.0, 3.0], "b": [1, 1, 1, 1]})
Expand Down

0 comments on commit 31984dd

Please sign in to comment.