Skip to content

Commit

Permalink
[lazy] aggregations return Option, so that we don't have to specify a…
Browse files Browse the repository at this point in the history
…ll the columns; #99
  • Loading branch information
ritchie46 committed Oct 5, 2020
1 parent ddaf954 commit 148e31f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 36 deletions.
6 changes: 4 additions & 2 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl Executor for GroupByExec {

for expr in &self.aggs {
let agg_expr = expr.as_agg_expr()?;
let agg = agg_expr.evaluate(&df, groups)?;
columns.push(agg)
let opt_agg = agg_expr.evaluate(&df, groups)?;
if let Some(agg) = opt_agg {
columns.push(agg)
}
}
Ok(DataFrame::new_no_checks(columns))
}
Expand Down
118 changes: 85 additions & 33 deletions polars/src/lazy/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,13 @@ impl PhysicalExpr for AliasExpr {
}

impl AggPhysicalExpr for AliasExpr {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Series> {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Option<Series>> {
let agg_expr = self.expr.as_agg_expr()?;
let mut agg = agg_expr.evaluate(df, groups)?;
agg.rename(&self.name);
Ok(agg)
let opt_agg = agg_expr.evaluate(df, groups)?;
Ok(opt_agg.map(|mut agg| {
agg.rename(&self.name);
agg
}))
}
}

Expand Down Expand Up @@ -268,7 +270,7 @@ macro_rules! impl_to_field_for_agg {
}

macro_rules! impl_aggregation {
($expr_struct:ident, $agg_method:ident, $groupby_method_variant:expr, $mayunpack:ident) => {
($expr_struct:ident, $agg_method:ident, $groupby_method_variant:expr, $finish_evaluate:ident) => {
#[derive(Debug)]
pub struct $expr_struct {
expr: Rc<dyn PhysicalExpr>,
Expand All @@ -295,37 +297,79 @@ macro_rules! impl_aggregation {
}

impl AggPhysicalExpr for $expr_struct {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Series> {
fn evaluate(
&self,
df: &DataFrame,
groups: &[(usize, Vec<usize>)],
) -> Result<Option<Series>> {
let series = self.expr.evaluate(df)?;
let new_name = fmt_groupby_column(series.name(), $groupby_method_variant);
let opt_agg = apply_method_all_series!(series, $agg_method, groups);
let mut agg = $mayunpack!(opt_agg);
agg.rename(&new_name);
Ok(agg.into_series())
$finish_evaluate!(opt_agg, new_name)
}
}
};
}

macro_rules! unpack {
($opt_agg:expr) => {{
let agg = $opt_agg.expect("could not unpack aggregation result");
agg
macro_rules! rename_and_cast_to_series {
($opt_agg:expr, $new_name:expr) => {{
let opt_agg = $opt_agg.map(|mut agg| {
agg.rename(&$new_name);
agg.into_series()
});
Ok(opt_agg)
}};
}
macro_rules! identity {
($opt_agg:expr) => {{
$opt_agg
macro_rules! rename_and_cast_to_option {
($agg:expr, $new_name:expr) => {{
let mut agg = $agg;
agg.rename(&$new_name);
Ok(Some(agg))
}};
}

impl_aggregation!(AggMinExpr, agg_min, GroupByMethod::Min, unpack);
impl_aggregation!(AggMaxExpr, agg_max, GroupByMethod::Max, unpack);
impl_aggregation!(AggFirstExpr, agg_first, GroupByMethod::First, identity);
impl_aggregation!(AggLastExpr, agg_last, GroupByMethod::Last, identity);
impl_aggregation!(AggMedianExpr, agg_median, GroupByMethod::Median, unpack);
impl_aggregation!(AggMeanExpr, agg_mean, GroupByMethod::Mean, unpack);
impl_aggregation!(AggSumExpr, agg_sum, GroupByMethod::Sum, unpack);
impl_aggregation!(
AggMinExpr,
agg_min,
GroupByMethod::Min,
rename_and_cast_to_series
);
impl_aggregation!(
AggMaxExpr,
agg_max,
GroupByMethod::Max,
rename_and_cast_to_series
);
impl_aggregation!(
AggFirstExpr,
agg_first,
GroupByMethod::First,
rename_and_cast_to_option
);
impl_aggregation!(
AggLastExpr,
agg_last,
GroupByMethod::Last,
rename_and_cast_to_option
);
impl_aggregation!(
AggMedianExpr,
agg_median,
GroupByMethod::Median,
rename_and_cast_to_series
);
impl_aggregation!(
AggMeanExpr,
agg_mean,
GroupByMethod::Mean,
rename_and_cast_to_series
);
impl_aggregation!(
AggSumExpr,
agg_sum,
GroupByMethod::Sum,
rename_and_cast_to_series
);

#[derive(Debug)]
pub struct AggQuantileExpr {
Expand Down Expand Up @@ -354,13 +398,17 @@ impl PhysicalExpr for AggQuantileExpr {
}

impl AggPhysicalExpr for AggQuantileExpr {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Series> {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Option<Series>> {
let series = self.expr.evaluate(df)?;
let new_name = fmt_groupby_column(series.name(), GroupByMethod::Quantile(self.quantile));
let opt_agg = apply_method_all_series!(series, agg_quantile, groups, self.quantile);
let mut agg = opt_agg.expect("could not unpack aggregation result");
agg.rename(&new_name);
Ok(agg)

let opt_agg = opt_agg.map(|mut agg| {
agg.rename(&new_name);
agg.into_series()
});

Ok(opt_agg)
}
}

Expand Down Expand Up @@ -393,7 +441,7 @@ impl PhysicalExpr for AggGroupsExpr {
}

impl AggPhysicalExpr for AggGroupsExpr {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Series> {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Option<Series>> {
let series = self.expr.evaluate(df)?;
let new_name = fmt_groupby_column(series.name(), GroupByMethod::Groups);

Expand All @@ -406,7 +454,7 @@ impl AggPhysicalExpr for AggGroupsExpr {
.collect();

column.rename(&new_name);
Ok(column.into_series())
Ok(Some(column.into_series()))
}
}

Expand Down Expand Up @@ -439,12 +487,16 @@ impl PhysicalExpr for AggNUniqueExpr {
}

impl AggPhysicalExpr for AggNUniqueExpr {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Series> {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Option<Series>> {
let series = self.expr.evaluate(df)?;
let new_name = fmt_groupby_column(series.name(), GroupByMethod::NUnique);
let opt_agg = apply_method_all_series!(series, agg_n_unique, groups);
let mut agg = unpack!(opt_agg);
agg.rename(&new_name);
Ok(agg.into_series())

let opt_agg = opt_agg.map(|mut agg| {
agg.rename(&new_name);
agg.into_series()
});

Ok(opt_agg)
}
}
2 changes: 1 addition & 1 deletion polars/src/lazy/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ pub trait PhysicalExpr: Debug {
}

pub trait AggPhysicalExpr {
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Series>;
fn evaluate(&self, df: &DataFrame, groups: &[(usize, Vec<usize>)]) -> Result<Option<Series>>;
}

0 comments on commit 148e31f

Please sign in to comment.