Skip to content

Commit

Permalink
feat[python]: add new utility function "align_frames" (#4765)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-beedie committed Sep 8, 2022
1 parent 922e8e7 commit 7dda077
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 19 deletions.
17 changes: 12 additions & 5 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ def version() -> str:
from polars.internals.dataframe import wrap_df # noqa: F401
from polars.internals.dataframe import DataFrame
from polars.internals.expr import Expr
from polars.internals.functions import concat, cut, date_range, get_dummies
from polars.internals.functions import (
align_frames,
concat,
cut,
date_range,
get_dummies,
)
from polars.internals.io import read_ipc_schema, read_parquet_schema
from polars.internals.lazy_functions import _date as date
from polars.internals.lazy_functions import _datetime as datetime
Expand Down Expand Up @@ -196,19 +202,20 @@ def version() -> str:
"using_string_cache",
# polars.config
"Config",
# polars.internal.when
# polars.internals.whenthen
"when",
# polars.internal.expr
# polars.internals.expr
"Expr",
# polars.internal.functions
# polars.internals.functions
"align_frames",
"arg_where",
"concat",
"date_range",
"get_dummies",
"repeat",
"element",
"cut",
# polars.internal.lazy_functions
# polars.internals.lazy_functions
"col",
"count",
"std",
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def from_arrow(
a : Arrow Table or Array
Data represented as Arrow Table or Array.
rechunk : bool, default True
Make sure that all data is contiguous.
Make sure that all data is in contiguous memory.
Returns
-------
Expand Down Expand Up @@ -329,7 +329,7 @@ def from_pandas(
df : pandas DataFrame, Series, or DatetimeIndex
Data represented as a pandas DataFrame, Series, or DatetimeIndex.
rechunk : bool, default True
Make sure that all data is contiguous.
Make sure that all data is in contiguous memory.
nan_to_none : bool, default True
If data contains NaN values PyArrow will convert the NaN to None
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/internals/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def _from_arrow(
If not specified, existing Array table columns are used, with missing names
named as `column_0`, `column_1`, etc.
rechunk : bool, default True
Make sure that all data is contiguous.
Make sure that all data is in contiguous memory.
Returns
-------
Expand Down Expand Up @@ -469,7 +469,7 @@ def _from_pandas(
Column labels to use for resulting DataFrame. If specified, overrides any
labels already present in the data. Must match data dimensions.
rechunk : bool, default True
Make sure that all data is contiguous.
Make sure that all data is in contiguous memory.
nan_to_none : bool, default True
If data contains NaN values PyArrow will convert the NaN to None
Expand Down
188 changes: 184 additions & 4 deletions py-polars/polars/internals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ def concat(
items
DataFrames/Series/LazyFrames to concatenate.
rechunk
rechunk the final DataFrame/Series.
Make sure that all data is in contiguous memory.
how : {'vertical', 'diagonal', 'horizontal'}
Only used if the items are DataFrames.
- Vertical: Applies multiple `vstack` operations.
- Diagonal: Finds a union between the column schemas and fills missing column
- Vertical: applies multiple `vstack` operations.
- Diagonal: finds a union between the column schemas and fills missing column
values with null.
- Horizontal: Stacks Series horizontally and fills with nulls if the lengths
- Horizontal: stacks Series horizontally and fills with nulls if the lengths
don't match.
Examples
Expand Down Expand Up @@ -371,3 +371,183 @@ def cut(
)
)
return result


@overload
def align_frames(
*frames: pli.DataFrame,
on: str | pli.Expr | Sequence[str] | Sequence[pli.Expr] | Sequence[str | pli.Expr],
select: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
) -> list[pli.DataFrame]:
...


@overload
def align_frames(
*frames: pli.LazyFrame,
on: str | pli.Expr | Sequence[str] | Sequence[pli.Expr] | Sequence[str | pli.Expr],
select: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
) -> list[pli.LazyFrame]:
...


def align_frames(
*frames: pli.DataFrame | pli.LazyFrame,
on: str | pli.Expr | Sequence[str] | Sequence[pli.Expr] | Sequence[str | pli.Expr],
select: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
) -> list[pli.DataFrame] | list[pli.LazyFrame]:
"""
Align a sequence of frames using the values from one or more columns as a key.
Frames that do not contain the given key values have rows injected (with nulls
filling the non-key columns), and each resulting frame is sorted by the key.
The original column order of input frames is not changed unless `select` is
specified (in which case the final column order is determined from that).
Note that this does not result in a joined frame - you receive the same number
of frames back that you passed in, but each is now aligned by key and has
the same number of rows.
Parameters
----------
frames
sequence of DataFrames or LazyFrames.
on
one or more columns whose values will be used to align the frames.
select
optional post-alignment column select to constrain and/or order
the columns returned from the newly aligned frames.
Examples
--------
>>> df1 = pl.DataFrame(
... {
... "dt": [date(2022, 9, 1), date(2022, 9, 2), date(2022, 9, 3)],
... "x": [3.5, 4.0, 1.0],
... "y": [10.0, 2.5, 1.5],
... }
... )
>>> df2 = pl.DataFrame(
... {
... "dt": [date(2022, 9, 2), date(2022, 9, 3), date(2022, 9, 1)],
... "x": [8.0, 1.0, 3.5],
... "y": [1.5, 12.0, 5.0],
... }
... )
>>> df3 = pl.DataFrame(
... {
... "dt": [date(2022, 9, 3), date(2022, 9, 2)],
... "x": [2.0, 5.0],
... "y": [2.5, 2.0],
... }
... )
# df1 df2 df3
# shape: (3, 3) shape: (3, 3) shape: (2, 3)
# ┌────────────┬─────┬──────┐ ┌────────────┬─────┬──────┐ ┌────────────┬─────┬─────┐
# │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │
# │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │
# │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │
# ╞════════════╪═════╪══════╡ ╞════════════╪═════╪══════╡ ╞════════════╪═════╪═════╡
# │ 2022-09-01 ┆ 3.5 ┆ 10.0 │ │ 2022-09-02 ┆ 8.0 ┆ 1.5 │ │ 2022-09-03 ┆ 2.0 ┆ 2.5 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌┤
# │ 2022-09-02 ┆ 4.0 ┆ 2.5 │ │ 2022-09-03 ┆ 1.0 ┆ 12.0 │ │ 2022-09-02 ┆ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ └────────────┴─────┴─────┘
# │ 2022-09-03 ┆ 1.0 ┆ 1.5 │ │ 2022-09-01 ┆ 3.5 ┆ 5.0 │
# └────────────┴─────┴──────┘ └────────────┴─────┴──────┘
>>> # align frames on the values in "dt", but keep only cols "x" and "y":
>>> af1, af2, af3 = pl.align_frames(df1, df2, df3, on="dt", select=["x", "y"])
# af1 af2 af3
# shape: (3, 3) shape: (3, 3) shape: (3, 3)
# ┌─────┬──────┐ ┌─────┬──────┐ ┌──────┬──────┐
# │ x ┆ y │ │ x ┆ y │ │ x ┆ y │
# │ --- ┆ --- │ │ --- ┆ --- │ │ --- ┆ --- │
# │ f64 ┆ f64 │ │ f64 ┆ f64 │ │ f64 ┆ f64 │
# ╞═════╪══════╡ ╞═════╪══════╡ ╞══════╪══════╡
# │ 3.5 ┆ 10.0 │ │ 3.5 ┆ 5.0 │ │ null ┆ null │
# ├╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 4.0 ┆ 2.5 │ │ 8.0 ┆ 1.5 │ │ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 1.0 ┆ 1.5 │ │ 1.0 ┆ 12.0 │ │ 2.0 ┆ 2.5 │
# └─────┴──────┘ └─────┴──────┘ └──────┴──────┘
>>> # now frames are aligned, can easily calculate the row-wise dot product:
>>> (af1 * af2 * af3).fill_null(0).select(pl.sum(pl.col("*")).alias("dot"))
shape: (3, 1)
┌───────┐
│ dot │
│ --- │
│ f64 │
╞═══════╡
│ 0.0 │
├╌╌╌╌╌╌╌┤
│ 167.5 │
├╌╌╌╌╌╌╌┤
│ 47.0 │
└───────┘
>>> # as above, but keeping the alignment column ("dt") in the final frame:
>>> af1, af2, af3 = pl.align_frames(df1, df2, df3, on="dt")
# af1 af2 af3
# shape: (3, 3) shape: (3, 3) shape: (3, 3)
# ┌────────────┬─────┬──────┐ ┌────────────┬─────┬──────┐ ┌────────────┬──────┬──────┐
# │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │ │ dt ┆ x ┆ y │
# │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │ │ --- ┆ --- ┆ --- │
# │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │ │ date ┆ f64 ┆ f64 │
# ╞════════════╪═════╪══════╡ ╞════════════╪═════╪══════╡ ╞════════════╪══════╪══════╡
# │ 2022-09-01 ┆ 3.5 ┆ 10.0 │ │ 2022-09-01 ┆ 3.5 ┆ 5.0 │ │ 2022-09-01 ┆ null ┆ null │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 2022-09-02 ┆ 4.0 ┆ 2.5 │ │ 2022-09-02 ┆ 8.0 ┆ 1.5 │ │ 2022-09-02 ┆ 5.0 ┆ 2.0 │
# ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌┤ ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
# │ 2022-09-03 ┆ 1.0 ┆ 1.5 │ │ 2022-09-03 ┆ 1.0 ┆ 12.0 │ │ 2022-09-03 ┆ 2.0 ┆ 2.5 │
# └────────────┴─────┴──────┘ └────────────┴─────┴──────┘ └────────────┴──────┴──────┘
>>> (af1[["x", "y"]] * af2[["x", "y"]] * af3[["x", "y"]]).fill_null(0).select(
... pl.sum(pl.col("*")).alias("dot")
... ).insert_at_idx(0, af1["dt"])
shape: (3, 2)
┌────────────┬───────┐
│ dt ┆ dot │
│ --- ┆ --- │
│ date ┆ f64 │
╞════════════╪═══════╡
│ 2022-09-01 ┆ 0.0 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2022-09-02 ┆ 167.5 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2022-09-03 ┆ 47.0 │
└────────────┴───────┘
""" # noqa: E501
if not frames:
return [] # type: ignore[return-value]
elif len({type(f) for f in frames}) != 1:
raise TypeError(
"Input frames must be of a consistent type (all LazyFrame or all DataFrame)"
)

eager = isinstance(frames[0], pli.DataFrame)
alignment_frame = concat([df.lazy().select(on) for df in frames]).unique(
maintain_order=False
)
if eager: # collect once, outside the alignment joins
alignment_frame = alignment_frame.collect().lazy()

aligned_frames = [
alignment_frame.join(
other=df.lazy(),
on=alignment_frame.columns, # type: ignore[arg-type]
how="left",
)
.select(df.columns)
.sort(by=on)
for df in frames
]
if select is not None:
aligned_frames = [df.select(select) for df in aligned_frames]

return [df.collect() for df in aligned_frames] if eager else aligned_frames
8 changes: 6 additions & 2 deletions py-polars/polars/internals/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,12 @@ def inspect(s: pli.DataFrame) -> pli.DataFrame:

def sort(
self: LDF,
by: str | pli.Expr | list[str] | list[pli.Expr],
reverse: bool | list[bool] = False,
by: str
| pli.Expr
| Sequence[str]
| Sequence[pli.Expr]
| Sequence[str | pli.Expr],
reverse: bool | Sequence[bool] = False,
nulls_last: bool = False,
) -> LDF:
"""
Expand Down
69 changes: 65 additions & 4 deletions py-polars/tests/unit/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ def test_date_datetime() -> None:
"hour": [23, 12, 8],
}
)

out = df.select(
[
pl.all(),
pl.datetime("year", "month", "day", "hour").dt.hour().cast(int).alias("h2"),
pl.date("year", "month", "day").dt.day().cast(int).alias("date"),
]
)

assert out["date"].series_equal(df["day"].rename("date"))
assert out["h2"].series_equal(df["hour"].rename("h2"))

Expand Down Expand Up @@ -73,14 +71,12 @@ def test_all_any_horizontally() -> None:
],
columns=["var1", "var2", "var3"],
)

expected = pl.DataFrame(
{
"any": [True, True, False, True, None],
"all": [False, False, False, None, False],
}
)

assert df.select(
[
pl.any([pl.col("var2"), pl.col("var3")]),
Expand Down Expand Up @@ -122,3 +118,68 @@ def test_null_handling_correlation() -> None:
)
assert out["pearson"][0] == pytest.approx(1.0)
assert out["spearman"][0] == pytest.approx(1.0)


def test_align_frames() -> None:
import numpy as np
import pandas as pd

# setup some test frames
df1 = pd.DataFrame(
{
"date": pd.date_range(start="2019-01-02", periods=9),
"a": np.array([0, 1, 2, np.nan, 4, 5, 6, 7, 8], dtype=np.float64),
"b": np.arange(9, 18, dtype=np.float64),
}
).set_index("date")

df2 = pd.DataFrame(
{
"date": pd.date_range(start="2019-01-04", periods=7),
"a": np.arange(9, 16, dtype=np.float64),
"b": np.arange(10, 17, dtype=np.float64),
}
).set_index("date")

# calculate dot-product in pandas
pd_dot = (df1 * df2).sum(axis="columns").to_frame("dot").reset_index()

# use "align_frames" to calculate dot-product from disjoint rows. pandas uses an
# index to automatically infer the correct frame-alignment for the calculation;
# we need to do it explicitly (which also makes it clearer what is happening)
pf1, pf2 = pl.align_frames(
pl.from_pandas(df1.reset_index()),
pl.from_pandas(df2.reset_index()),
on="date",
)
# (note: feels like we should be able to streamline dot-product further)
pl_dot = (
(pf1[["a", "b"]] * pf2[["a", "b"]])
.fill_null(0)
.select(pl.sum(pl.col("*")).alias("dot"))
.insert_at_idx(0, pf1["date"])
)
# confirm we match the same operation in pandas
assert pl_dot.frame_equal(pl.from_pandas(pd_dot))
pd.testing.assert_frame_equal(pd_dot, pl_dot.to_pandas())

# (also: confirm alignment function works with lazyframes)
lf1, lf2 = pl.align_frames(
pl.from_pandas(df1.reset_index()).lazy(),
pl.from_pandas(df2.reset_index()).lazy(),
on="date",
)
assert isinstance(lf1, pl.LazyFrame)
assert lf1.collect().frame_equal(pf1)
assert lf2.collect().frame_equal(pf2)

# misc
assert [] == pl.align_frames(on="date")

# expected error condition
with pytest.raises(TypeError):
pl.align_frames( # type: ignore[call-overload]
pl.from_pandas(df1.reset_index()).lazy(),
pl.from_pandas(df2.reset_index()),
on="date",
)

0 comments on commit 7dda077

Please sign in to comment.