Skip to content

Commit

Permalink
groupby on multiple keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 3, 2020
1 parent a88404f commit 8d70527
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 35 deletions.
3 changes: 2 additions & 1 deletion polars/src/doc/changelog/v0_5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
//! * `DataFrame.column` returns `Result<_>` **breaking change**.
//! * Define idiomatic way to do inplace operations on a `DataFrame` with `apply`, `may_apply` and `ChunkSet`
//! * `ChunkSet` Trait.
//! * Groupby can be done on a selection of multiple columns.
//! * `Groupby` aggregations can be done on a selection of multiple columns.
//! * `Groupby` operation can be done on multiple keys.
//!
174 changes: 140 additions & 34 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,63 @@ impl IntoGroupTuples for Float64Chunked {}
impl IntoGroupTuples for Float32Chunked {}
impl IntoGroupTuples for LargeListChunked {}

/// Utility enum used for grouping on multiple columns
#[derive(Copy, Clone, Hash, Eq, PartialEq)]
enum Groupable<'a> {
Boolean(bool),
Utf8(&'a str),
UInt8(u8),
UInt16(u16),
UInt32(u32),
UInt64(u64),
Int8(i8),
Int16(i16),
Int32(i32),
Int64(i64),
}

impl Series {
fn as_groupable_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Option<Groupable>> + 'a> {
macro_rules! as_groupable_iter {
($ca:expr, $variant:ident ) => {{
Box::new(
$ca.into_iter()
.map(|opt_b| opt_b.map(|b| Groupable::$variant(b))),
)
}};
}
match self {
Series::Bool(ca) => as_groupable_iter!(ca, Boolean),
Series::UInt8(ca) => as_groupable_iter!(ca, UInt8),
Series::UInt16(ca) => as_groupable_iter!(ca, UInt16),
Series::UInt32(ca) => as_groupable_iter!(ca, UInt32),
Series::UInt64(ca) => as_groupable_iter!(ca, UInt64),
Series::Int8(ca) => as_groupable_iter!(ca, Int8),
Series::Int16(ca) => as_groupable_iter!(ca, Int16),
Series::Int32(ca) => as_groupable_iter!(ca, Int32),
Series::Int64(ca) => as_groupable_iter!(ca, Int64),
Series::Date32(ca) => as_groupable_iter!(ca, Int32),
Series::Date64(ca) => as_groupable_iter!(ca, Int64),
Series::TimestampSecond(ca) => as_groupable_iter!(ca, Int64),
Series::TimestampMillisecond(ca) => as_groupable_iter!(ca, Int64),
Series::TimestampNanosecond(ca) => as_groupable_iter!(ca, Int64),
Series::TimestampMicrosecond(ca) => as_groupable_iter!(ca, Int64),
Series::Time32Second(ca) => as_groupable_iter!(ca, Int32),
Series::Time32Millisecond(ca) => as_groupable_iter!(ca, Int32),
Series::Time64Nanosecond(ca) => as_groupable_iter!(ca, Int64),
Series::Time64Microsecond(ca) => as_groupable_iter!(ca, Int64),
Series::DurationNanosecond(ca) => as_groupable_iter!(ca, Int64),
Series::DurationMicrosecond(ca) => as_groupable_iter!(ca, Int64),
Series::DurationMillisecond(ca) => as_groupable_iter!(ca, Int64),
Series::DurationSecond(ca) => as_groupable_iter!(ca, Int64),
Series::IntervalDayTime(ca) => as_groupable_iter!(ca, Int64),
Series::IntervalYearMonth(ca) => as_groupable_iter!(ca, Int32),
Series::Utf8(ca) => as_groupable_iter!(ca, Utf8),
_ => unimplemented!(),
}
}
}

impl DataFrame {
/// Group DataFrame using a Series column.
///
Expand All @@ -80,15 +137,53 @@ impl DataFrame {
/// .sum()
/// }
/// ```
pub fn groupby(&self, by: &str) -> Result<GroupBy> {
let s = self.column(by)?;
let groups = s.group_tuples();
pub fn groupby<'g, S: Selection<'g>>(&self, by: S) -> Result<GroupBy> {
let selected_keys = self.select_series(by)?;

let groups = match selected_keys.len() {
1 => selected_keys[0].group_tuples(),
2 => {
let iter = selected_keys[0]
.as_groupable_iter()
.zip(selected_keys[1].as_groupable_iter());
groupby(iter)
}
3 => {
let iter = selected_keys[0]
.as_groupable_iter()
.zip(selected_keys[1].as_groupable_iter())
.zip(selected_keys[2].as_groupable_iter());
groupby(iter)
}
4 => {
let iter = selected_keys[0]
.as_groupable_iter()
.zip(selected_keys[1].as_groupable_iter())
.zip(selected_keys[2].as_groupable_iter())
.zip(selected_keys[3].as_groupable_iter());
groupby(iter)
}
5 => {
let iter = selected_keys[0]
.as_groupable_iter()
.zip(selected_keys[1].as_groupable_iter())
.zip(selected_keys[2].as_groupable_iter())
.zip(selected_keys[3].as_groupable_iter())
.zip(selected_keys[4].as_groupable_iter());
groupby(iter)
}
_ => {
return Err(PolarsError::Other(
"more than 5 combined keys are currently not supported".to_string(),
));
}
};

Ok(GroupBy {
df: self,
by: by.to_string(),
selected_keys,
groups,
selection: None,
selected_agg: None,
})
}
}
Expand Down Expand Up @@ -145,11 +240,11 @@ impl DataFrame {
#[derive(Debug, Clone)]
pub struct GroupBy<'a, 'b> {
df: &'a DataFrame,
/// By which column should the grouping operation be performed.
pub by: String,
selected_keys: Vec<Series>,
// [first idx, [other idx]]
groups: Vec<(usize, Vec<usize>)>,
selection: Option<Vec<&'b str>>,
// columns selected for aggregation
selected_agg: Option<Vec<&'b str>>,
}

#[enum_dispatch(Series)]
Expand Down Expand Up @@ -291,21 +386,33 @@ impl<'a, 'b> GroupBy<'a, 'b> {
where
S: Selection<'b>,
{
self.selection = Some(selection.to_selection_vec());
self.selected_agg = Some(selection.to_selection_vec());
self
}

fn keys(&self) -> Series {
fn keys(&self) -> Vec<Series> {
// Keys will later be appended with the aggregation columns, so we already allocate extra space
let size;
if let Some(sel) = &self.selected_agg {
size = sel.len() + self.selected_keys.len();
} else {
size = self.selected_keys.len();
}
let mut keys = Vec::with_capacity(size);
unsafe {
self.df.column(&self.by).unwrap().take_iter_unchecked(
self.groups.iter().map(|(idx, _)| *idx),
Some(self.groups.len()),
)
self.selected_keys.iter().for_each(|s| {
let key = s.take_iter_unchecked(
self.groups.iter().map(|(idx, _)| *idx),
Some(self.groups.len()),
);
keys.push(key)
});
}
keys
}

fn prepare_agg(&self) -> Result<(Series, Vec<Series>)> {
let selection = match &self.selection {
fn prepare_agg(&self) -> Result<(Vec<Series>, Vec<Series>)> {
let selection = match &self.selected_agg {
Some(selection) => selection,
None => return Err(PolarsError::NoSelection),
};
Expand Down Expand Up @@ -341,10 +448,8 @@ impl<'a, 'b> GroupBy<'a, 'b> {
/// +------------+-----------+-----------+
/// ```
pub fn mean(&self) -> Result<DataFrame> {
let (keys, agg_cols) = self.prepare_agg()?;
let (mut cols, agg_cols) = self.prepare_agg()?;

let mut cols = Vec::with_capacity(agg_cols.len() + 1);
cols.push(keys);
for agg_col in agg_cols {
let new_name = format!["{}_mean", agg_col.name()];
let mut agg = agg_col.agg_mean(&self.groups);
Expand Down Expand Up @@ -380,9 +485,8 @@ impl<'a, 'b> GroupBy<'a, 'b> {
/// +------------+----------+
/// ```
pub fn sum(&self) -> Result<DataFrame> {
let (keys, agg_cols) = self.prepare_agg()?;
let mut cols = Vec::with_capacity(agg_cols.len() + 1);
cols.push(keys);
let (mut cols, agg_cols) = self.prepare_agg()?;

for agg_col in agg_cols {
let new_name = format!["{}_sum", agg_col.name()];
let mut agg = agg_col.agg_sum(&self.groups);
Expand Down Expand Up @@ -418,9 +522,7 @@ impl<'a, 'b> GroupBy<'a, 'b> {
/// +------------+----------+
/// ```
pub fn min(&self) -> Result<DataFrame> {
let (keys, agg_cols) = self.prepare_agg()?;
let mut cols = Vec::with_capacity(agg_cols.len() + 1);
cols.push(keys);
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_min", agg_col.name()];
let mut agg = agg_col.agg_min(&self.groups);
Expand Down Expand Up @@ -456,9 +558,7 @@ impl<'a, 'b> GroupBy<'a, 'b> {
/// +------------+----------+
/// ```
pub fn max(&self) -> Result<DataFrame> {
let (keys, agg_cols) = self.prepare_agg()?;
let mut cols = Vec::with_capacity(agg_cols.len() + 1);
cols.push(keys);
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_max", agg_col.name()];
let mut agg = agg_col.agg_max(&self.groups);
Expand Down Expand Up @@ -494,9 +594,7 @@ impl<'a, 'b> GroupBy<'a, 'b> {
/// +------------+------------+
/// ```
pub fn count(&self) -> Result<DataFrame> {
let (keys, agg_cols) = self.prepare_agg()?;
let mut cols = Vec::with_capacity(agg_cols.len() + 1);
cols.push(keys);
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_count", agg_col.name()];
let mut builder = PrimitiveChunkedBuilder::new(&new_name, self.groups.len());
Expand Down Expand Up @@ -570,9 +668,7 @@ impl<'a, 'b> GroupBy<'a, 'b> {
}};
}

let (keys, agg_cols) = self.prepare_agg()?;
let mut cols = Vec::with_capacity(agg_cols.len() + 1);
cols.push(keys);
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_agg_list", agg_col.name()];
let mut agg =
Expand Down Expand Up @@ -611,6 +707,7 @@ mod test {
"{:?}",
df.groupby("date").unwrap().select("temp").count().unwrap()
);
// Select multiple
println!(
"{:?}",
df.groupby("date")
Expand All @@ -619,6 +716,15 @@ mod test {
.mean()
.unwrap()
);
// Group by multiple
println!(
"multiple keys {:?}",
df.groupby(&["date", "temp"])
.unwrap()
.select("rain")
.mean()
.unwrap()
);
println!(
"{:?}",
df.groupby("date").unwrap().select("temp").sum().unwrap()
Expand Down

0 comments on commit 8d70527

Please sign in to comment.