Skip to content

Commit

Permalink
DataFrame::partition_by (#3148)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 15, 2022
1 parent 45de199 commit eb81237
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 0 deletions.
1 change: 1 addition & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ dataframe_arithmetic = ["polars-core/dataframe_arithmetic"]
product = ["polars-core/product"]
unique_counts = ["polars-core/unique_counts", "polars-lazy/unique_counts"]
log = ["polars-core/log", "polars-lazy/log"]
partition_by = ["polars-core/partition_by"]

series_from_anyvalue = ["polars-core/series_from_anyvalue"]

Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ dataframe_arithmetic = []
product = []
unique_counts = []
log = []
partition_by = []

dynamic_groupby = ["dtype-datetime", "dtype-date"]

Expand Down
42 changes: 42 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2944,6 +2944,48 @@ impl DataFrame {
.reduce(|acc, b| get_supertype(&acc?, &b.unwrap()))
}

#[cfg(feature = "partition_by")]
pub(crate) unsafe fn take_unchecked_slice(&self, idx: &[IdxSize]) -> Self {
self.take_iter_unchecked(idx.iter().map(|i| *i as usize))
}

#[cfg(feature = "partition_by")]
fn partition_by_impl(&self, cols: &[String], stable: bool) -> Result<Vec<DataFrame>> {
let groups = if stable {
self.groupby_stable(cols)?.groups
} else {
self.groupby(cols)?.groups
};

Ok(POOL.install(|| {
groups
.idx_ref()
.into_par_iter()
.map(|(_, group)| {
// groups are in bounds
unsafe { self.take_unchecked_slice(group) }
})
.collect()
}))
}

/// Split into multiple DataFrames partitioned by groups
#[cfg(feature = "partition_by")]
#[cfg_attr(docsrs, doc(cfg(feature = "partition_by")))]
pub fn partition_by(&self, cols: impl IntoVec<String>) -> Result<Vec<DataFrame>> {
let cols = cols.into_vec();
self.partition_by_impl(&cols, false)
}

/// Split into multiple DataFrames partitioned by groups
/// Order of the groups are maintained.
#[cfg(feature = "partition_by")]
#[cfg_attr(docsrs, doc(cfg(feature = "partition_by")))]
pub fn partition_by_stable(&self, cols: impl IntoVec<String>) -> Result<Vec<DataFrame>> {
let cols = cols.into_vec();
self.partition_by_impl(&cols, true)
}

/// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be
/// inserted as columns.
#[cfg(feature = "dtype-struct")]
Expand Down
1 change: 1 addition & 0 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
//! - `diagonal_concat` - Concat diagonally thereby combining different schemas.
//! - `horizontal_concat` - Concat horizontally and extend with null values if lengths don't match
//! - `dataframe_arithmetic` - Arithmetic on (Dataframe and DataFrames) and (DataFrame on Series)
//! - `partition_by` - Split into multiple DataFrames partitioned by groups.
//! * `Series` operations:
//! - `is_in` - [Check for membership in `Series`](crate::chunked_array::ops::IsIn)
//! - `zip_with` - [Zip two Series/ ChunkedArrays](crate::chunked_array::ops::ChunkZip)
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ features = [
"unique_counts",
"log",
"serde-lazy",
"partition_by",
]

# [patch.crates-io]
Expand Down
3 changes: 3 additions & 0 deletions py-polars/docs/source/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Manipulation/ selection
DataFrame.join_asof
DataFrame.interpolate
DataFrame.transpose
DataFrame.partition_by
DataFrame.upsample
DataFrame.unnest

Expand Down Expand Up @@ -180,6 +181,8 @@ Pivot
-----
This namespace comes available by calling `DataFrame.groupby(..).pivot`

*Note that this API is deprecated in favor of `DataFrame.pivot`*

.. currentmodule:: polars.internals.frame

.. autosummary::
Expand Down
62 changes: 62 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4096,6 +4096,68 @@ def melt(
self._df.melt(id_vars, value_vars, value_name, variable_name)
)

def partition_by(
self, groups: Union[str, List[str]], maintain_order: bool = True
) -> List[DF]:
"""
Split into multiple DataFrames partitioned by groups.
Parameters
----------
groups
Groups to partition by
maintain_order
Keep predictable output order. This is slower as it requires and extra sort operation.
Examples
--------
>>> df = pl.DataFrame(
... {
... "foo": ["A", "A", "B", "B", "C"],
... "N": [1, 2, 2, 4, 2],
... "bar": ["k", "l", "m", "m", "l"],
... }
... )
>>> df.partition_by(groups="foo", maintain_order=True)
[shape: (2, 3)
┌─────┬─────┬─────┐
│ foo ┆ N ┆ bar │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ str │
╞═════╪═════╪═════╡
│ A ┆ 1 ┆ k │
├╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌┤
│ A ┆ 2 ┆ l │
└─────┴─────┴─────┘,
shape: (2, 3)
┌─────┬─────┬─────┐
│ foo ┆ N ┆ bar │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ str │
╞═════╪═════╪═════╡
│ B ┆ 2 ┆ m │
├╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌┤
│ B ┆ 4 ┆ m │
└─────┴─────┴─────┘,
shape: (1, 3)
┌─────┬─────┬─────┐
│ foo ┆ N ┆ bar │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ str │
╞═════╪═════╪═════╡
│ C ┆ 2 ┆ l │
└─────┴─────┴─────┘]
"""
if isinstance(groups, str):
groups = [groups]

return [
self._from_pydf(_df) # type: ignore
for _df in self._df.partition_by(groups, maintain_order)
]

def shift(self: DF, periods: int) -> DF:
"""
Shift the values by a given period and fill the parts that will be empty due to this operation
Expand Down
12 changes: 12 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,18 @@ impl PyDataFrame {
Ok(PyDataFrame::new(df))
}

pub fn partition_by(&self, groups: Vec<String>, stable: bool) -> PyResult<Vec<Self>> {
let out = if stable {
self.df.partition_by_stable(groups)
} else {
self.df.partition_by(groups)
}
.map_err(PyPolarsErr::from)?;
// Safety:
// Repr mem layout
Ok(unsafe { std::mem::transmute::<Vec<DataFrame>, Vec<PyDataFrame>>(out) })
}

pub fn shift(&self, periods: i64) -> Self {
self.df.shift(periods).into()
}
Expand Down
27 changes: 27 additions & 0 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# flake8: noqa: W191,E101
import io
import sys
import typing
from builtins import range
from datetime import datetime
from io import BytesIO
Expand Down Expand Up @@ -2044,3 +2045,29 @@ def test_asof_by_multiple_keys() -> None:
.select(["a", "by"])
.frame_equal(pl.DataFrame({"a": [-20, -19, 8, 12, 14], "by": [1, 1, 2, 2, 2]}))
)


@typing.no_type_check
def test_partition_by() -> None:
df = pl.DataFrame(
{
"foo": ["A", "A", "B", "B", "C"],
"N": [1, 2, 2, 4, 2],
"bar": ["k", "l", "m", "m", "l"],
}
)

assert [
a.to_dict(False) for a in df.partition_by(["foo", "bar"], maintain_order=True)
] == [
{"foo": ["A"], "N": [1], "bar": ["k"]},
{"foo": ["A"], "N": [2], "bar": ["l"]},
{"foo": ["B", "B"], "N": [2, 4], "bar": ["m", "m"]},
{"foo": ["C"], "N": [2], "bar": ["l"]},
]

assert [a.to_dict(False) for a in df.partition_by("foo", maintain_order=True)] == [
{"foo": ["A", "A"], "N": [1, 2], "bar": ["k", "l"]},
{"foo": ["B", "B"], "N": [2, 4], "bar": ["m", "m"]},
{"foo": ["C"], "N": [2], "bar": ["l"]},
]

0 comments on commit eb81237

Please sign in to comment.