Skip to content

Commit

Permalink
multiple aggregations on columns and 'agg' syntax; #44 ; closes #75
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent 69fb36d commit 2dbc368
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 3 deletions.
124 changes: 121 additions & 3 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ trait AggQuantile {
fn agg_quantile(&self, _groups: &Vec<(usize, Vec<usize>)>, _quantile: f64) -> Option<Series> {
None
}

fn agg_median(&self, groups: &Vec<(usize, Vec<usize>)>) -> Option<Series> {
self.agg_quantile(groups, 0.5)
}
}

impl<T> AggQuantile for ChunkedArray<T>
Expand Down Expand Up @@ -672,7 +676,7 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {

/// Aggregate grouped series and compute the sum per group.
///
/// # Example agg.rename(&new_name);
/// # Example
///
/// ```rust
/// # use polars::prelude::*;
Expand Down Expand Up @@ -937,7 +941,7 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_median", agg_col.name()];
let opt_agg = agg_col.agg_quantile(&self.groups, 0.5);
let opt_agg = agg_col.agg_median(&self.groups);
if let Some(mut agg) = opt_agg {
agg.rename(&new_name);
cols.push(agg.into_series());
Expand Down Expand Up @@ -986,6 +990,113 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
DataFrame::new(cols)
}

///
/// Combine different aggregations on columns
///
/// ## Operations
///
/// * count
/// * first
/// * last
/// * sum
/// * min
/// * max
/// * mean
/// * median
///
/// # Example
///
/// ```rust
/// # use polars::prelude::*;
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// df.groupby("date")?.agg(&[("temp", &["n_unique", "sum", "min"])])
/// }
/// ```
/// Returns:
///
/// ```text
/// +--------------+---------------+----------+----------+
/// | date | temp_n_unique | temp_sum | temp_min |
/// | --- | --- | --- | --- |
/// | date32(days) | u32 | i32 | i32 |
/// +==============+===============+==========+==========+
/// | 2020-08-23 | 1 | 9 | 9 |
/// +--------------+---------------+----------+----------+
/// | 2020-08-22 | 2 | 8 | 1 |
/// +--------------+---------------+----------+----------+
/// | 2020-08-21 | 2 | 30 | 10 |
/// +--------------+---------------+----------+----------+
/// ```
///
pub fn agg<Column, S, Slice>(&self, column_to_agg: &[(Column, Slice)]) -> Result<DataFrame>
where
S: AsRef<str>,
S: AsRef<str>,
Slice: AsRef<[S]>,
Column: AsRef<str>,
{
// create a mapping from columns to aggregations on that column
let mut map =
HashMap::with_capacity_and_hasher(column_to_agg.len(), FnvBuildHasher::default());
column_to_agg
.into_iter()
.for_each(|(column, aggregations)| {
map.insert(column.as_ref(), aggregations.as_ref());
});

macro_rules! finish_agg_opt {
($self:ident, $name_fmt:expr, $agg_fn:ident, $agg_col:ident, $cols:ident) => {{
let new_name = format![$name_fmt, $agg_col.name()];
let opt_agg = $agg_col.$agg_fn(&$self.groups);
if let Some(mut agg) = opt_agg {
agg.rename(&new_name);
$cols.push(agg.into_series());
}
}};
}
macro_rules! finish_agg {
($self:ident, $name_fmt:expr, $agg_fn:ident, $agg_col:ident, $cols:ident) => {{
let new_name = format![$name_fmt, $agg_col.name()];
let mut agg = $agg_col.$agg_fn(&$self.groups);
agg.rename(&new_name);
$cols.push(agg.into_series());
}};
}

let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
if let Some(&aggregations) = map.get(agg_col.name()) {
for aggregation_f in aggregations.as_ref() {
match aggregation_f.as_ref() {
"min" => finish_agg_opt!(self, "{}_min", agg_min, agg_col, cols),
"max" => finish_agg_opt!(self, "{}_max", agg_max, agg_col, cols),
"mean" => finish_agg_opt!(self, "{}_mean", agg_mean, agg_col, cols),
"sum" => finish_agg_opt!(self, "{}_sum", agg_sum, agg_col, cols),
"first" => finish_agg!(self, "{}_first", agg_first, agg_col, cols),
"last" => finish_agg!(self, "{}_last", agg_last, agg_col, cols),
"n_unique" => {
finish_agg_opt!(self, "{}_n_unique", agg_n_unique, agg_col, cols)
}
"median" => finish_agg_opt!(self, "{}_median", agg_n_unique, agg_col, cols),
"count" => {
let new_name = format!["{}_count", agg_col.name()];
let mut builder =
PrimitiveChunkedBuilder::new(&new_name, self.groups.len());
for (_first, idx) in &self.groups {
builder.append_value(idx.len() as u32);
}
let ca = builder.finish();
let agg = Series::UInt32(ca);
cols.push(agg);
}
a => panic!(format!("aggregation: {:?} is not supported", a)),
}
}
}
}
DataFrame::new(cols)
}

/// Aggregate the groups of the groupby operation into lists.
///
/// # Example
Expand Down Expand Up @@ -1499,7 +1610,14 @@ mod test {
let gb = df.groupby("date").unwrap().n_unique().unwrap();
println!("{:?}", df.groupby("date").unwrap().n_unique().unwrap());
// check the group by column is filtered out.
assert_eq!(gb.width(), 2)
assert_eq!(gb.width(), 2);
println!(
"{:?}",
df.groupby("date")
.unwrap()
.agg(&[("temp", &["n_unique", "sum", "min"])])
.unwrap()
);
}

#[test]
Expand Down
29 changes: 29 additions & 0 deletions py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,35 @@ def __init__(self, df: DataFrame, by: List[str]):
self._df = df
self.by = by

def agg(
self, column_to_agg: Union[List[Tuple[str, List[str]]], Dict[str, List[str]]]
) -> DataFrame:
"""
Use multiple aggregations on columns
Parameters
----------
column_to_agg
map column to aggregation functions
Examples:
[("foo", ["sum", "n_unique", "min"]),
("bar": ["max"])]
{"foo": ["sum", "n_unique", "min"],
"bar": "max" }
Returns
-------
Result of groupby split apply operations.
"""
if isinstance(column_to_agg, dict):
column_to_agg = [(column, [agg] if isinstance(agg, str) else agg) for (column, agg) in column_to_agg.items()]
else:
column_to_agg = [(column, [agg] if isinstance(agg, str) else agg) for (column, agg) in column_to_agg]

return wrap_df(self._df.groupby_agg(self.by, column_to_agg))

def select(self, columns: Union[str, List[str]]) -> GBSelection:
"""
Select the columns that will be aggregated.
Expand Down
10 changes: 10 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ impl PyDataFrame {
Ok(PyDataFrame::new(df))
}

pub fn groupby_agg(
&self,
by: Vec<&str>,
column_to_agg: Vec<(&str, Vec<&str>)>,
) -> PyResult<Self> {
let gb = self.df.groupby(&by).map_err(PyPolarsEr::from)?;
let df = gb.agg(&column_to_agg).map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

pub fn groupby_quantile(
&self,
by: Vec<&str>,
Expand Down
4 changes: 4 additions & 0 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ def test_groupby():

(df.groupby("a").select("b").quantile(0.3))

gb_df = df.groupby("a").agg({"b": ["sum", "min"], "c": "count"})
assert "b_sum" in gb_df.columns
assert "b_min" in gb_df.columns

#
# # TODO: is false because count is u32
# df.groupby(by="a", select="b", agg="count").frame_equal(
Expand Down

0 comments on commit 2dbc368

Please sign in to comment.