Skip to content

Commit

Permalink
refactor[python]: dispatch join logic to LazyFrame side (#4804)
Browse files Browse the repository at this point in the history
  • Loading branch information
matteosantama committed Sep 10, 2022
1 parent 1af7b38 commit 9bc1a2e
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 145 deletions.
63 changes: 17 additions & 46 deletions py-polars/polars/internals/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3765,9 +3765,9 @@ def join_asof(
def join(
self,
other: DataFrame,
left_on: str | pli.Expr | list[str | pli.Expr] | None = None,
right_on: str | pli.Expr | list[str | pli.Expr] | None = None,
on: str | pli.Expr | list[str | pli.Expr] | None = None,
left_on: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
right_on: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
on: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
how: JoinStrategy = "inner",
suffix: str = "_right",
) -> DataFrame:
Expand Down Expand Up @@ -3840,52 +3840,23 @@ def join(
│ 3 ┆ 8.0 ┆ c ┆ null │
└──────┴──────┴─────┴───────┘
**Joining on columns with categorical data**
See pl.StringCache().
Note
----
For joining on columns with categorical data, see ``pl.StringCache()``.
"""
if how == "cross":
return self._from_pydf(self._df.join(other._df, [], [], how, suffix))

left_on_: list[str | pli.Expr] | None
if isinstance(left_on, (str, pli.Expr)):
left_on_ = [left_on]
else:
left_on_ = left_on

right_on_: list[str | pli.Expr] | None
if isinstance(right_on, (str, pli.Expr)):
right_on_ = [right_on]
else:
right_on_ = right_on

if isinstance(on, (str, pli.Expr)):
left_on_ = [on]
right_on_ = [on]
elif isinstance(on, list):
left_on_ = on
right_on_ = on

if left_on_ is None or right_on_ is None:
raise ValueError("You should pass the column to join on as an argument.")

if isinstance(left_on_[0], pli.Expr) or isinstance(right_on_[0], pli.Expr):
return (
self.lazy()
.join(
other.lazy(),
left_on,
right_on,
on=on,
how=how,
suffix=suffix,
)
.collect(no_optimization=True)
)
else:
return self._from_pydf(
self._df.join(other._df, left_on_, right_on_, how, suffix)
return (
self.lazy()
.join(
other=other.lazy(),
left_on=left_on,
right_on=right_on,
on=on,
how=how,
suffix=suffix,
)
.collect(no_optimization=True)
)

def apply(
self: DF,
Expand Down
2 changes: 0 additions & 2 deletions py-polars/polars/internals/expr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from polars.internals.expr.expr import (
Expr,
ensure_list_of_pyexpr,
expr_to_lit_or_expr,
selection_to_pyexpr_list,
wrap_expr,
)

__all__ = [
"Expr",
"ensure_list_of_pyexpr",
"expr_to_lit_or_expr",
"selection_to_pyexpr_list",
"wrap_expr",
Expand Down
20 changes: 2 additions & 18 deletions py-polars/polars/internals/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from polars.internals.expr.meta import ExprMetaNameSpace
from polars.internals.expr.string import ExprStringNameSpace
from polars.internals.expr.struct import ExprStructNameSpace
from polars.utils import deprecated_alias, is_expr_sequence, is_pyexpr_sequence
from polars.utils import deprecated_alias

try:
from polars.polars import PyExpr
Expand All @@ -49,30 +49,14 @@


def selection_to_pyexpr_list(
exprs: str | Expr | Sequence[str | Expr | pli.Series] | pli.Series,
exprs: str | Expr | pli.Series | Sequence[str | Expr | pli.Series],
) -> list[PyExpr]:
if isinstance(exprs, (str, Expr, pli.Series)):
exprs = [exprs]

return [expr_to_lit_or_expr(e, str_to_lit=False)._pyexpr for e in exprs]


def ensure_list_of_pyexpr(exprs: object) -> list[PyExpr]:
if isinstance(exprs, PyExpr):
return [exprs]

if is_pyexpr_sequence(exprs):
return list(exprs)

if isinstance(exprs, Expr):
return [exprs._pyexpr]

if is_expr_sequence(exprs):
return [e._pyexpr for e in exprs]

raise TypeError(f"unexpected type '{type(exprs)}'")


def expr_to_lit_or_expr(
expr: (
Expr
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/internals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def align_frames(
aligned_frames = [
alignment_frame.join(
other=df.lazy(),
on=alignment_frame.columns, # type: ignore[arg-type]
on=alignment_frame.columns,
how="left",
)
.select(df.columns)
Expand Down
84 changes: 23 additions & 61 deletions py-polars/polars/internals/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from polars import internals as pli
from polars.cfg import Config
from polars.datatypes import DataType, PolarsDataType, Schema, py_type_to_dtype
from polars.internals import selection_to_pyexpr_list
from polars.internals.lazyframe.groupby import LazyGroupBy
from polars.internals.slice import LazyPolarsSlice
from polars.utils import (
Expand All @@ -23,7 +24,7 @@
)

try:
from polars.polars import PyExpr, PyLazyFrame
from polars.polars import PyLazyFrame

_DOCUMENTING = False
except ImportError:
Expand Down Expand Up @@ -1062,8 +1063,8 @@ def groupby(
└─────┴─────┘
"""
new_by = _prepare_groupby_inputs(by)
lgb = self._ldf.groupby(new_by, maintain_order)
pyexprs_by = selection_to_pyexpr_list(by)
lgb = self._ldf.groupby(pyexprs_by, maintain_order)
return LazyGroupBy(lgb, lazyframe_class=self.__class__)

def groupby_rolling(
Expand Down Expand Up @@ -1176,9 +1177,11 @@ def groupby_rolling(
"""
if offset is None:
offset = f"-{period}"
by = _prepare_groupby_inputs(by)
pyexprs_by = [] if by is None else selection_to_pyexpr_list(by)

lgb = self._ldf.groupby_rolling(index_column, period, offset, closed, by)
lgb = self._ldf.groupby_rolling(
index_column, period, offset, closed, pyexprs_by
)
return LazyGroupBy(lgb, lazyframe_class=self.__class__)

def groupby_dynamic(
Expand Down Expand Up @@ -1270,7 +1273,7 @@ def groupby_dynamic(
offset = "0ns"
if period is None:
period = every
by = _prepare_groupby_inputs(by)
pyexprs_by = [] if by is None else selection_to_pyexpr_list(by)
lgb = self._ldf.groupby_dynamic(
index_column,
every,
Expand All @@ -1279,7 +1282,7 @@ def groupby_dynamic(
truncate,
include_boundaries,
closed,
by,
pyexprs_by,
)
return LazyGroupBy(lgb, lazyframe_class=self.__class__)

Expand Down Expand Up @@ -1421,9 +1424,9 @@ def join_asof(
def join(
self: LDF,
other: LazyFrame,
left_on: str | pli.Expr | list[str | pli.Expr] | None = None,
right_on: str | pli.Expr | list[str | pli.Expr] | None = None,
on: str | pli.Expr | list[str | pli.Expr] | None = None,
left_on: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
right_on: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
on: str | pli.Expr | Sequence[str | pli.Expr] | None = None,
how: JoinStrategy = "inner",
suffix: str = "_right",
allow_parallel: bool = True,
Expand Down Expand Up @@ -1511,44 +1514,21 @@ def join(
)
)

left_on_: list[str | pli.Expr] | None
if isinstance(left_on, (str, pli.Expr)):
left_on_ = [left_on]
else:
left_on_ = left_on

right_on_: list[str | pli.Expr] | None
if isinstance(right_on, (str, pli.Expr)):
right_on_ = [right_on]
if on is not None:
pyexprs = selection_to_pyexpr_list(on)
pyexprs_left = pyexprs
pyexprs_right = pyexprs
elif left_on is not None and right_on is not None:
pyexprs_left = selection_to_pyexpr_list(left_on)
pyexprs_right = selection_to_pyexpr_list(right_on)
else:
right_on_ = right_on

if isinstance(on, (str, pli.Expr)):
left_on_ = [on]
right_on_ = [on]
elif isinstance(on, list):
left_on_ = on
right_on_ = on

if left_on_ is None or right_on_ is None:
raise ValueError("You should pass the column to join on as an argument.")

new_left_on = []
for column in left_on_:
if isinstance(column, str):
column = pli.col(column)
new_left_on.append(column._pyexpr)
new_right_on = []
for column in right_on_:
if isinstance(column, str):
column = pli.col(column)
new_right_on.append(column._pyexpr)
raise ValueError("must specify `on` OR `left_on` and `right_on`")

return self._from_pyldf(
self._ldf.join(
other._ldf,
new_left_on,
new_right_on,
pyexprs_left,
pyexprs_right,
allow_parallel,
force_parallel,
how,
Expand Down Expand Up @@ -2509,21 +2489,3 @@ def unnest(self: LDF, names: str | list[str]) -> LDF:
if isinstance(names, str):
names = [names]
return self._from_pyldf(self._ldf.unnest(names))


def _prepare_groupby_inputs(
by: str | list[str] | pli.Expr | list[pli.Expr] | None,
) -> list[PyExpr]:
if isinstance(by, list):
new_by = []
for e in by:
if isinstance(e, str):
e = pli.col(e)
new_by.append(e._pyexpr)
elif isinstance(by, str):
new_by = [pli.col(by)._pyexpr]
elif isinstance(by, pli.Expr):
new_by = [by._pyexpr]
elif by is None:
return []
return new_by
4 changes: 2 additions & 2 deletions py-polars/polars/internals/lazyframe/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import polars.internals as pli
from polars.datatypes import Schema
from polars.internals.expr import ensure_list_of_pyexpr
from polars.internals import selection_to_pyexpr_list
from polars.utils import is_expr_sequence

try:
Expand Down Expand Up @@ -54,7 +54,7 @@ def agg(self, aggs: pli.Expr | Sequence[pli.Expr]) -> LDF:
msg = f"expected 'Expr | Sequence[Expr]', got '{type(aggs)}'"
raise TypeError(msg)

pyexprs = ensure_list_of_pyexpr(aggs)
pyexprs = selection_to_pyexpr_list(aggs)
return self._lazyframe_class._from_pyldf(self.lgb.agg(pyexprs))

def head(self, n: int = 5) -> LDF:
Expand Down
15 changes: 0 additions & 15 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,21 +785,6 @@ impl PyDataFrame {
format!("{:?}", self.df)
}

pub fn join(
&self,
other: &PyDataFrame,
left_on: Vec<&str>,
right_on: Vec<&str>,
how: Wrap<JoinType>,
suffix: String,
) -> PyResult<Self> {
let df = self
.df
.join(&other.df, left_on, right_on, how.0, Some(suffix))
.map_err(PyPolarsErr::from)?;
Ok(PyDataFrame::new(df))
}

pub fn get_columns(&self) -> Vec<PySeries> {
let cols = self.df.get_columns().clone();
to_pyseries_collection(cols)
Expand Down

0 comments on commit 9bc1a2e

Please sign in to comment.