Skip to content

Commit

Permalink
improve partition_by (#3386)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 12, 2022
1 parent 40885d3 commit 47b060c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 38 deletions.
34 changes: 22 additions & 12 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2926,22 +2926,30 @@ impl DataFrame {
}

#[cfg(feature = "partition_by")]
fn partition_by_impl(&self, cols: &[String], stable: bool) -> Result<Vec<DataFrame>> {
#[doc(hidden)]
pub fn _partition_by_impl(
&self,
cols: &[String],
stable: bool,
) -> Result<impl IndexedParallelIterator<Item = DataFrame> + '_> {
let groups = if stable {
self.groupby_stable(cols)?.groups
} else {
self.groupby(cols)?.groups
};

Ok(POOL.install(|| {
groups
.idx_ref()
.into_par_iter()
.map(|(_, group)| {
// groups are in bounds
unsafe { self.take_unchecked_slice(group) }
})
.collect()
Ok(POOL.install(move || {
match groups {
GroupsProxy::Idx(idx) => {
idx.into_par_iter().map(|(_, group)| {
// groups are in bounds
unsafe { self.take_unchecked_slice(&group) }
})
}
_ => {
unimplemented!()
}
}
}))
}

Expand All @@ -2950,7 +2958,8 @@ impl DataFrame {
#[cfg_attr(docsrs, doc(cfg(feature = "partition_by")))]
pub fn partition_by(&self, cols: impl IntoVec<String>) -> Result<Vec<DataFrame>> {
let cols = cols.into_vec();
self.partition_by_impl(&cols, false)
self._partition_by_impl(&cols, false)
.map(|iter| iter.collect())
}

/// Split into multiple DataFrames partitioned by groups
Expand All @@ -2959,7 +2968,8 @@ impl DataFrame {
#[cfg_attr(docsrs, doc(cfg(feature = "partition_by")))]
pub fn partition_by_stable(&self, cols: impl IntoVec<String>) -> Result<Vec<DataFrame>> {
let cols = cols.into_vec();
self.partition_by_impl(&cols, true)
self._partition_by_impl(&cols, true)
.map(|iter| iter.collect())
}

/// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be
Expand Down
25 changes: 5 additions & 20 deletions polars/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{utils::resolve_homedir, WriterFactory};
use polars_core::prelude::*;
use rayon::iter::IndexedParallelIterator;
use rayon::prelude::*;
use std::{
fs::File,
io::BufWriter,
Expand Down Expand Up @@ -88,25 +88,10 @@ where
}

pub fn finish(self, df: &DataFrame) -> Result<()> {
use polars_core::POOL;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;

let partitioned = df.partition_by(&self.by)?;

if self.parallel {
POOL.install(|| {
partitioned
.into_par_iter()
.enumerate()
.map(|(i, mut partition_df)| self.write_partition_df(&mut partition_df, i))
.collect::<Result<Vec<_>>>()
})?;
} else {
for (i, mut partition_df) in partitioned.into_iter().enumerate() {
self.write_partition_df(&mut partition_df, i)?;
}
}
df._partition_by_impl(&self.by, false)?
.enumerate()
.map(|(i, mut part_df)| self.write_partition_df(&mut part_df, i))
.collect::<Result<Vec<_>>>()?;

Ok(())
}
Expand Down
35 changes: 29 additions & 6 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4158,8 +4158,12 @@ def melt(
)

def partition_by(
self, groups: Union[str, List[str]], maintain_order: bool = True
) -> List[DF]:
self,
groups: Union[str, List[str]],
maintain_order: bool = True,
*,
as_dict: bool = False,
) -> Union[List[DF], Dict[Any, DF]]:
"""
Split into multiple DataFrames partitioned by groups.
Expand All @@ -4169,6 +4173,8 @@ def partition_by(
Groups to partition by
maintain_order
Keep predictable output order. This is slower as it requires and extra sort operation.
as_dict
Return as dictionary
Examples
--------
Expand Down Expand Up @@ -4214,10 +4220,24 @@ def partition_by(
if isinstance(groups, str):
groups = [groups]

return [
self._from_pydf(_df) # type: ignore
for _df in self._df.partition_by(groups, maintain_order)
]
if as_dict:
out = dict()
if len(groups) == 1:
for _df in self._df.partition_by(groups, maintain_order):
df = self._from_pydf(_df)
out[df[groups][0, 0]] = df
else:
for _df in self._df.partition_by(groups, maintain_order):
df = self._from_pydf(_df)
out[df[groups].row(0)] = df # type: ignore

return out # type: ignore

else:
return [
self._from_pydf(_df) # type: ignore
for _df in self._df.partition_by(groups, maintain_order)
]

def shift(self: DF, periods: int) -> DF:
"""
Expand Down Expand Up @@ -5472,6 +5492,9 @@ def get_group(self, group_value: Union[Any, Tuple[Any]]) -> DF:
"""
Select a single group as a new DataFrame.
.. deprecated:: 0.13.32
Please use `partition_by`
Parameters
----------
group_value
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,16 @@ def test_partition_by() -> None:
{"foo": ["C"], "N": [2], "bar": ["l"]},
]

df = pl.DataFrame({"a": ["one", "two", "one", "two"], "b": [1, 2, 3, 4]})
assert df.partition_by(["a", "b"], as_dict=True)["one", 1].to_dict(False) == {
"a": ["one"],
"b": [1],
}
assert df.partition_by(["a"], as_dict=True)["one"].to_dict(False) == {
"a": ["one", "one"],
"b": [1, 3],
}


@typing.no_type_check
def test_list_of_list_of_struct() -> None:
Expand Down

0 comments on commit 47b060c

Please sign in to comment.