Skip to content

Commit

Permalink
refactor[python]: Break up lazy_frame module into submodules (#4410)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Aug 14, 2022
1 parent d1dc004 commit afc211c
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 254 deletions.
4 changes: 2 additions & 2 deletions py-polars/docs/source/reference/lazyframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ LazyFrame
.. currentmodule:: polars

.. py:class:: LazyFrame
:canonical: polars.internals.lazy_frame.LazyFrame
:canonical: polars.internals.lazyframe.frame.LazyFrame

Representation of a Lazy computation graph/ query.

Expand Down Expand Up @@ -112,7 +112,7 @@ GroupBy
-------
This namespace comes available by calling `LazyFrame.groupby(..)`.

.. currentmodule:: polars.internals.lazy_frame
.. currentmodule:: polars.internals.lazyframe.groupby

.. autosummary::
:toctree: api/
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def version() -> str:
from polars.internals.frame import DataFrame
from polars.internals.functions import concat, cut, date_range, get_dummies
from polars.internals.io import read_ipc_schema, read_parquet_schema
from polars.internals.lazy_frame import LazyFrame
from polars.internals.lazy_functions import _date as date
from polars.internals.lazy_functions import _datetime as datetime
from polars.internals.lazy_functions import (
Expand Down Expand Up @@ -109,6 +108,7 @@ def version() -> str:
)
from polars.internals.lazy_functions import to_list as list
from polars.internals.lazy_functions import var
from polars.internals.lazyframe import LazyFrame

# TODO: remove need for wrap_s
from polars.internals.series import wrap_s # noqa: F401
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
read_ipc_schema,
read_parquet_schema,
)
from polars.internals.lazy_frame import LazyFrame, wrap_ldf
from polars.internals.lazy_functions import (
all,
arg_where,
Expand All @@ -35,6 +34,7 @@
lit,
select,
)
from polars.internals.lazyframe import LazyFrame, wrap_ldf
from polars.internals.series import Series, wrap_s
from polars.internals.whenthen import when # used in expr.clip()

Expand Down
6 changes: 6 additions & 0 deletions py-polars/polars/internals/lazyframe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from polars.internals.lazyframe.frame import LazyFrame, wrap_ldf

__all__ = [
"LazyFrame",
"wrap_ldf",
]
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,39 @@
import typing
from io import BytesIO, IOBase, StringIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Generic, Sequence, TypeVar, overload

from polars.internals.expr import ensure_list_of_pyexpr

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal

try:
from polars.polars import PyExpr, PyLazyFrame, PyLazyGroupBy

_DOCUMENTING = False
except ImportError:
_DOCUMENTING = True

from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar, overload

from polars import internals as pli
from polars.cfg import Config
from polars.datatypes import DataType, py_type_to_dtype
from polars.internals.lazyframe.groupby import LazyGroupBy
from polars.internals.slice import LazyPolarsSlice
from polars.utils import (
_in_notebook,
_prepare_row_count_args,
_process_null_values,
format_path,
is_expr_sequence,
)

try:
from polars.polars import PyExpr, PyLazyFrame

_DOCUMENTING = False
except ImportError:
_DOCUMENTING = True

try:
import pyarrow as pa

_PYARROW_AVAILABLE = True
except ImportError:
_PYARROW_AVAILABLE = False

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal

if TYPE_CHECKING:
from polars.internals.type_aliases import (
AsofJoinStrategy,
Expand All @@ -71,24 +68,6 @@ def wrap_ldf(ldf: PyLazyFrame) -> LazyFrame:
return LazyFrame._from_pyldf(ldf)


def _prepare_groupby_inputs(
by: str | list[str] | pli.Expr | list[pli.Expr] | None,
) -> list[PyExpr]:
if isinstance(by, list):
new_by = []
for e in by:
if isinstance(e, str):
e = pli.col(e)
new_by.append(e._pyexpr)
elif isinstance(by, str):
new_by = [pli.col(by)._pyexpr]
elif isinstance(by, pli.Expr):
new_by = [by._pyexpr]
elif by is None:
return []
return new_by


class LazyFrame:
"""Representation of a Lazy computation graph/query."""

Expand Down Expand Up @@ -546,7 +525,7 @@ def inspect(self: LDF, fmt: str = "{}") -> LDF:
... .inspect() # print the node before the filter
... .filter(pl.col("bar") == pl.col("foo"))
... ) # doctest: +ELLIPSIS
<polars.internals.lazy_frame.LazyFrame object at ...>
<polars.internals.lazyframe.frame.LazyFrame object at ...>
"""

Expand Down Expand Up @@ -2390,217 +2369,19 @@ def unnest(self: LDF, names: str | list[str]) -> LDF:
return self._from_pyldf(self._ldf.unnest(names))


class LazyGroupBy(Generic[LDF]):
"""Created by `df.lazy().groupby("foo)"`."""

def __init__(self, lgb: PyLazyGroupBy, lazyframe_class: type[LDF]) -> None:
self.lgb = lgb
self._lazyframe_class = lazyframe_class

def agg(self, aggs: pli.Expr | Sequence[pli.Expr]) -> LDF:
"""
Describe the aggregation that need to be done on a group.
Parameters
----------
aggs
Single / multiple aggregation expression(s).
Examples
--------
>>> (
... pl.scan_csv("data.csv")
... .groupby("groups")
... .agg(
... [
... pl.col("name").n_unique().alias("unique_names"),
... pl.max("values"),
... ]
... )
... ) # doctest: +SKIP
"""
if not (isinstance(aggs, pli.Expr) or is_expr_sequence(aggs)):
msg = f"expected 'Expr | Sequence[Expr]', got '{type(aggs)}'"
raise TypeError(msg)

pyexprs = ensure_list_of_pyexpr(aggs)
return self._lazyframe_class._from_pyldf(self.lgb.agg(pyexprs))

def head(self, n: int = 5) -> LDF:
"""
Return first n rows of each group.
Parameters
----------
n
Number of values of the group to select
Examples
--------
>>> df = pl.DataFrame(
... {
... "letters": ["c", "c", "a", "c", "a", "b"],
... "nrs": [1, 2, 3, 4, 5, 6],
... }
... )
>>> df
shape: (6, 2)
┌─────────┬─────┐
│ letters ┆ nrs │
│ --- ┆ --- │
│ str ┆ i64 │
╞═════════╪═════╡
│ c ┆ 1 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 2 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ a ┆ 3 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 4 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ a ┆ 5 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ b ┆ 6 │
└─────────┴─────┘
>>> df.groupby("letters").head(2).sort("letters")
shape: (5, 2)
┌─────────┬─────┐
│ letters ┆ nrs │
│ --- ┆ --- │
│ str ┆ i64 │
╞═════════╪═════╡
│ a ┆ 3 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ a ┆ 5 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ b ┆ 6 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 1 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 2 │
└─────────┴─────┘
"""
return self._lazyframe_class._from_pyldf(self.lgb.head(n))

def tail(self, n: int = 5) -> LDF:
"""
Return last n rows of each group.
Parameters
----------
n
Number of values of the group to select
Examples
--------
>>> df = pl.DataFrame(
... {
... "letters": ["c", "c", "a", "c", "a", "b"],
... "nrs": [1, 2, 3, 4, 5, 6],
... }
... )
>>> df
shape: (6, 2)
┌─────────┬─────┐
│ letters ┆ nrs │
│ --- ┆ --- │
│ str ┆ i64 │
╞═════════╪═════╡
│ c ┆ 1 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 2 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ a ┆ 3 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 4 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ a ┆ 5 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ b ┆ 6 │
└─────────┴─────┘
>>> df.groupby("letters").tail(2).sort("letters")
shape: (5, 2)
┌─────────┬─────┐
│ letters ┆ nrs │
│ --- ┆ --- │
│ str ┆ i64 │
╞═════════╪═════╡
│ a ┆ 3 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ a ┆ 5 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ b ┆ 6 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 2 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 4 │
└─────────┴─────┘
"""
return self._lazyframe_class._from_pyldf(self.lgb.tail(n))

def apply(self, f: Callable[[pli.DataFrame], pli.DataFrame]) -> LDF:
"""
Apply a function over the groups as a new `DataFrame`.
Implementing logic using this .apply method is generally slower and more memory
intensive than implementing the same logic using the expression API because:
- with .apply the logic is implemented in Python but with an expression the
logic is implemented in Rust
- with .apply the DataFrame is materialized in memory
- expressions can be parallelised
- expressions can be optimised
If possible use the expression API for best performance.
Parameters
----------
f
Function to apply over each group of the `LazyFrame`.
Examples
--------
The function is applied by group.
>>> df = pl.DataFrame(
... {
... "foo": [1, 2, 3, 1],
... "bar": ["a", "b", "c", "c"],
... }
... )
>>> (
... df.lazy()
... .groupby("bar", maintain_order=True)
... .agg(
... [
... pl.col("foo").apply(lambda x: x.sum()),
... ]
... )
... .collect()
... )
shape: (3, 2)
┌─────┬─────┐
│ bar ┆ foo │
│ --- ┆ --- │
│ str ┆ i64 │
╞═════╪═════╡
│ a ┆ 1 │
├╌╌╌╌╌┼╌╌╌╌╌┤
│ b ┆ 2 │
├╌╌╌╌╌┼╌╌╌╌╌┤
│ c ┆ 4 │
└─────┴─────┘
It is better to implement this with an expression:
>>> (
... df.groupby("bar", maintain_order=True).agg(
... pl.col("foo").sum(),
... )
... ) # doctest: +IGNORE_RESULT
"""
return self._lazyframe_class._from_pyldf(self.lgb.apply(f))
def _prepare_groupby_inputs(
by: str | list[str] | pli.Expr | list[pli.Expr] | None,
) -> list[PyExpr]:
if isinstance(by, list):
new_by = []
for e in by:
if isinstance(e, str):
e = pli.col(e)
new_by.append(e._pyexpr)
elif isinstance(by, str):
new_by = [pli.col(by)._pyexpr]
elif isinstance(by, pli.Expr):
new_by = [by._pyexpr]
elif by is None:
return []
return new_by

0 comments on commit afc211c

Please sign in to comment.