Skip to content

Commit

Permalink
groupby_rolling (#2435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 21, 2022
1 parent d691b76 commit b0c9d36
Show file tree
Hide file tree
Showing 21 changed files with 669 additions and 53 deletions.
130 changes: 122 additions & 8 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,65 @@ pub struct DynamicGroupOptions {
pub closed_window: ClosedWindow,
}

#[derive(Clone, Debug)]
pub struct RollingGroupOptions {
/// Time or index column
pub index_column: String,
/// window duration
pub period: Duration,
pub offset: Duration,
pub closed_window: ClosedWindow,
}

const LB_NAME: &str = "_lower_boundary";
const UP_NAME: &str = "_upper_boundary";

impl DataFrame {
/// Returns: time_keys, keys, grouptuples
pub fn groupby_rolling(&self, options: &RollingGroupOptions) -> Result<(Series, GroupsProxy)> {
let time = self.column(&options.index_column)?;
let time_type = time.dtype();

if time.null_count() > 0 {
panic!("null values in dynamic groupby not yet supported, fill nulls.")
}

use DataType::*;
let (dt, tu) = match time_type {
Datetime(tu, _) => (time.clone(), *tu),
Date => (
time.cast(&Datetime(TimeUnit::Milliseconds, None))?,
TimeUnit::Milliseconds,
),
Int32 => {
let time_type = Datetime(TimeUnit::Nanoseconds, None);
let dt = time.cast(&Int64).unwrap().cast(&time_type).unwrap();
let (out, gt) =
self.impl_groupby_rolling(dt, options, TimeUnit::Nanoseconds, &time_type)?;
let out = out.cast(&Int64).unwrap().cast(&Int32).unwrap();
return Ok((out, gt));
}
Int64 => {
let time_type = Datetime(TimeUnit::Nanoseconds, None);
let dt = time.cast(&time_type).unwrap();
let (out, gt) =
self.impl_groupby_rolling(dt, options, TimeUnit::Nanoseconds, &time_type)?;
let out = out.cast(&Int64).unwrap();
return Ok((out, gt));
}
dt => {
return Err(PolarsError::ValueError(
format!(
"expected any of the following dtypes {{Date, Datetime, Int32, Int64}}, got {}",
dt
)
.into(),
))
}
};
self.impl_groupby_rolling(dt, options, tu, time_type)
}

/// Returns: time_keys, keys, groupsproxy
pub fn groupby_dynamic(
&self,
by: Vec<Series>,
Expand All @@ -50,7 +104,7 @@ impl DataFrame {
let time_type = Datetime(TimeUnit::Nanoseconds, None);
let dt = time.cast(&Int64).unwrap().cast(&time_type).unwrap();
let (out, mut keys, gt) =
self.impl_groupby(dt, by, options, TimeUnit::Nanoseconds, &time_type)?;
self.impl_groupby_dynamic(dt, by, options, TimeUnit::Nanoseconds, &time_type)?;
let out = out.cast(&Int64).unwrap().cast(&Int32).unwrap();
for k in &mut keys {
if k.name() == UP_NAME || k.name() == LB_NAME {
Expand All @@ -63,7 +117,7 @@ impl DataFrame {
let time_type = Datetime(TimeUnit::Nanoseconds, None);
let dt = time.cast(&time_type).unwrap();
let (out, mut keys, gt) =
self.impl_groupby(dt, by, options, TimeUnit::Nanoseconds, &time_type)?;
self.impl_groupby_dynamic(dt, by, options, TimeUnit::Nanoseconds, &time_type)?;
let out = out.cast(&Int64).unwrap();
for k in &mut keys {
if k.name() == UP_NAME || k.name() == LB_NAME {
Expand All @@ -82,10 +136,10 @@ impl DataFrame {
))
}
};
self.impl_groupby(dt, by, options, tu, time_type)
self.impl_groupby_dynamic(dt, by, options, tu, time_type)
}

fn impl_groupby(
fn impl_groupby_dynamic(
&self,
dt: Series,
mut by: Vec<Series>,
Expand Down Expand Up @@ -116,7 +170,7 @@ impl DataFrame {
dt.downcast_iter()
.flat_map(|vals| {
let ts = vals.values().as_slice();
let (groups, lower, upper) = polars_time::groupby::groupby(
let (groups, lower, upper) = polars_time::groupby::groupby_windows(
w,
ts,
options.include_boundaries,
Expand All @@ -143,7 +197,7 @@ impl DataFrame {

let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (mut sub_groups, lower, upper) = polars_time::groupby::groupby(
let (mut sub_groups, lower, upper) = polars_time::groupby::groupby_windows(
w,
ts,
options.include_boundaries,
Expand Down Expand Up @@ -179,7 +233,7 @@ impl DataFrame {
};
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (mut sub_groups, _, _) = polars_time::groupby::groupby(
let (mut sub_groups, _, _) = polars_time::groupby::groupby_windows(
w,
ts,
options.include_boundaries,
Expand Down Expand Up @@ -235,6 +289,33 @@ impl DataFrame {
.cast(time_type)
.map(|s| (s, by, GroupsProxy::Idx(groups)))
}

fn impl_groupby_rolling(
&self,
dt: Series,
options: &RollingGroupOptions,
tu: TimeUnit,
time_type: &DataType,
) -> Result<(Series, GroupsProxy)> {
let dt = dt.datetime().unwrap().clone();

let groups = dt
.downcast_iter()
.flat_map(|vals| {
let ts = vals.values().as_slice();
polars_time::groupby::groupby_values(
options.period,
options.offset,
ts,
options.closed_window,
tu.to_polars_time(),
)
})
.collect::<Vec<_>>();
let groups = GroupsProxy::Slice(groups);

dt.cast(time_type).map(|s| (s, groups))
}
}

#[cfg(test)]
Expand All @@ -243,6 +324,39 @@ mod test {
use crate::time::date_range;
use polars_time::export::chrono::prelude::*;

#[test]
fn test_rolling_groupby() -> Result<()> {
let date = Utf8Chunked::new(
"dt",
[
"2020-01-01 13:45:48",
"2020-01-01 16:42:13",
"2020-01-01 16:45:09",
"2020-01-02 18:12:48",
"2020-01-03 19:45:32",
"2020-01-08 23:16:43",
],
)
.as_datetime(None, TimeUnit::Milliseconds)?
.into_series();
let a = Series::new("a", [3, 7, 5, 9, 2, 1]);
let df = DataFrame::new(vec![date, a.clone()])?;

let (_, groups) = df
.groupby_rolling(&RollingGroupOptions {
index_column: "dt".into(),
period: Duration::parse("2d"),
offset: Duration::parse("-2d"),
closed_window: ClosedWindow::Right,
})
.unwrap();
let sum = a.agg_sum(&groups).unwrap();
let expected = Series::new("", [3, 10, 15, 24, 11, 1]);
assert_eq!(sum, expected);

Ok(())
}

#[test]
fn test_dynamic_groupby_window() {
let start = NaiveDate::from_ymd(2021, 12, 16)
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ pub use pivot::PivotAgg;
pub struct DynamicGroupOptions {
pub index_column: String,
}
#[cfg(not(feature = "dynamic_groupby"))]
#[derive(Clone, Debug)]
pub struct RollingGroupOptions {
pub index_column: String,
}

pub use proxy::*;

Expand Down
20 changes: 19 additions & 1 deletion polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::borrow::Cow;

#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
use polars_core::datatypes::PlHashMap;
use polars_core::frame::groupby::DynamicGroupOptions;
use polars_core::frame::groupby::{DynamicGroupOptions, RollingGroupOptions};
use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
#[cfg(feature = "dtype-categorical")]
Expand Down Expand Up @@ -638,6 +638,19 @@ impl LazyFrame {
keys: by.as_ref().to_vec(),
maintain_order: false,
dynamic_options: None,
rolling_options: None,
}
}

pub fn groupby_rolling(self, options: RollingGroupOptions) -> LazyGroupBy {
let opt_state = self.get_opt_state();
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys: vec![],
maintain_order: true,
dynamic_options: None,
rolling_options: Some(options),
}
}

Expand All @@ -653,6 +666,7 @@ impl LazyFrame {
keys: by.as_ref().to_vec(),
maintain_order: true,
dynamic_options: Some(options),
rolling_options: None,
}
}

Expand All @@ -665,6 +679,7 @@ impl LazyFrame {
keys: by.as_ref().to_vec(),
maintain_order: true,
dynamic_options: None,
rolling_options: None,
}
}

Expand Down Expand Up @@ -965,6 +980,7 @@ pub struct LazyGroupBy {
keys: Vec<Expr>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>,
rolling_options: Option<RollingGroupOptions>,
}

impl LazyGroupBy {
Expand Down Expand Up @@ -999,6 +1015,7 @@ impl LazyGroupBy {
None,
self.maintain_order,
self.dynamic_options,
self.rolling_options,
)
.build();
LazyFrame::from_logical_plan(lp, self.opt_state)
Expand Down Expand Up @@ -1041,6 +1058,7 @@ impl LazyGroupBy {
Some(Arc::new(f)),
self.maintain_order,
None,
None,
)
.build();
LazyFrame::from_logical_plan(lp, self.opt_state)
Expand Down
13 changes: 6 additions & 7 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::logical_plan::{det_melt_schema, Context, CsvParserOptions};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
use ahash::RandomState;
use polars_core::frame::groupby::DynamicGroupOptions;
use polars_core::prelude::*;
use polars_utils::arena::{Arena, Node};
use std::collections::HashSet;
Expand Down Expand Up @@ -99,7 +98,7 @@ pub enum ALogicalPlan {
schema: SchemaRef,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>,
options: GroupbyOptions,
},
Join {
input_left: Node,
Expand Down Expand Up @@ -234,7 +233,7 @@ impl ALogicalPlan {
schema,
apply,
maintain_order,
dynamic_options,
options: dynamic_options,
..
} => Aggregate {
input: inputs[0],
Expand All @@ -243,7 +242,7 @@ impl ALogicalPlan {
schema: schema.clone(),
apply: apply.clone(),
maintain_order: *maintain_order,
dynamic_options: dynamic_options.clone(),
options: dynamic_options.clone(),
},
Join {
schema,
Expand Down Expand Up @@ -641,9 +640,9 @@ impl<'a> ALogicalPlanBuilder<'a> {
aggs: Vec<Node>,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>,
options: GroupbyOptions,
) -> Self {
debug_assert!(!(keys.is_empty() && dynamic_options.is_none()));
debug_assert!(!(keys.is_empty() && options.dynamic.is_none()));
let current_schema = self.schema();
// TODO! add this line if LogicalPlan is dropped in favor of ALogicalPlan
// let aggs = rewrite_projections(aggs, current_schema);
Expand All @@ -661,7 +660,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
schema: Arc::new(schema),
apply,
maintain_order,
dynamic_options,
options,
};
let root = self.lp_arena.add(lp);
Self::new(root, self.expr_arena, self.lp_arena)
Expand Down
9 changes: 6 additions & 3 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::prelude::*;
use crate::utils;
use crate::utils::{combine_predicates_expr, has_expr};
use ahash::RandomState;
use polars_core::frame::groupby::DynamicGroupOptions;
use polars_core::frame::groupby::{DynamicGroupOptions, RollingGroupOptions};
use polars_core::prelude::*;
#[cfg(feature = "csv-file")]
use polars_io::csv_core::utils::infer_file_schema;
Expand Down Expand Up @@ -274,8 +274,8 @@ impl LogicalPlanBuilder {
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>,
rolling_options: Option<RollingGroupOptions>,
) -> Self {
debug_assert!(!(keys.is_empty() && dynamic_options.is_none()));
let current_schema = self.0.schema();
let aggs = rewrite_projections(aggs.as_ref().to_vec(), current_schema, keys.as_ref());

Expand All @@ -290,7 +290,10 @@ impl LogicalPlanBuilder {
schema: Arc::new(schema),
apply,
maintain_order,
dynamic_options,
options: GroupbyOptions {
dynamic: dynamic_options,
rolling: rolling_options,
},
}
.into()
}
Expand Down

0 comments on commit b0c9d36

Please sign in to comment.