Skip to content

Commit

Permalink
init dynamic dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2021
1 parent 1db393d commit de91c14
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 5 deletions.
52 changes: 52 additions & 0 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::prelude::*;
use polars_time::{
Duration, Window
};
use crate::frame::groupby::{
GroupTuples,
GroupBy
};


#[derive(Clone, Debug)]
pub struct DynamicGroupOptions {
pub time_column: String,
/// start a window at this interval
every: Duration,
/// window duration
period: Duration,
/// offset window boundaries
offset: Duration,
/// truncate the time column values to the window
truncate: bool
}

impl DataFrame {
pub fn groupby_dynamic(&self, options: &DynamicGroupOptions) -> Result<(Self, GroupTuples)> {
let w = Window::new(options.every, options.period, options.offset);


let time = self.column(&options.time_column)?;
if time.null_count() > 0 {
panic!("null values in dynamic groupby not yet supported, fill nulls.")
}

let dt = time.cast(&DataType::Datetime)?;
let dt = dt.datetime().unwrap();

let gt = dt.downcast_iter().map(|vals| {
let ts = vals.values().as_slice();
polars_time::groupby::groupby(w, ts)
}).flatten().collect::<Vec<_>>();

let mut df = self.clone();
if options.truncate {
let out = dt.apply(|v| w.truncate(v));
let out = out.cast(&DataType::Datetime).unwrap();
df.with_column(out)?;
}


Ok((df, gt))
}
}
2 changes: 2 additions & 0 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ pub mod aggregations;
pub(crate) mod hashing;
#[cfg(feature = "pivot")]
pub(crate) mod pivot;
mod dynamic;

pub type GroupTuples = Vec<(u32, Vec<u32>)>;
pub type GroupedMap<T> = HashMap<T, Vec<u32>, RandomState>;
pub use dynamic::*;

/// Used to create the tuples for a groupby operation.
pub trait IntoGroupTuples {
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::collections::HashSet;
#[cfg(any(feature = "csv-file", feature = "parquet"))]
use std::path::PathBuf;
use std::sync::Arc;
use polars_core::frame::groupby::DynamicGroupOptions;

// ALogicalPlan is a representation of LogicalPlan with Nodes which are allocated in an Arena
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -98,6 +99,7 @@ pub enum ALogicalPlan {
schema: SchemaRef,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>
},
Join {
input_left: Node,
Expand Down Expand Up @@ -303,6 +305,7 @@ impl ALogicalPlan {
schema,
apply,
maintain_order,
dynamic_options,
..
} => Aggregate {
input: inputs[0],
Expand All @@ -311,6 +314,7 @@ impl ALogicalPlan {
schema: schema.clone(),
apply: apply.clone(),
maintain_order: *maintain_order,
dynamic_options: dynamic_options.clone()
},
Join {
schema,
Expand Down Expand Up @@ -731,6 +735,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
schema: Arc::new(schema),
apply,
maintain_order,
dynamic_options: None
};
let root = self.lp_arena.add(lp);
Self::new(root, self.expr_arena, self.lp_arena)
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ pub(crate) fn to_alp(
schema,
apply,
maintain_order,
dynamic_options
} => {
let i = to_alp(*input, expr_arena, lp_arena);
let aggs_new = aggs.into_iter().map(|x| to_aexpr(x, expr_arena)).collect();
Expand All @@ -332,6 +333,7 @@ pub(crate) fn to_alp(
schema,
apply,
maintain_order,
dynamic_options
}
}
LogicalPlan::Join {
Expand Down Expand Up @@ -782,6 +784,7 @@ pub(crate) fn node_to_lp(
schema,
apply,
maintain_order,
dynamic_options
} => {
let i = node_to_lp(input, expr_arena, lp_arena);

Expand All @@ -792,6 +795,7 @@ pub(crate) fn node_to_lp(
schema,
apply,
maintain_order,
dynamic_options
}
}
ALogicalPlan::Join {
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod projection;
use polars_io::ipc::IpcReader;
use projection::*;
use std::io::{Read, Seek, SeekFrom};
use polars_core::frame::groupby::DynamicGroupOptions;

// Will be set/ unset in the fetch operation to communicate overwriting the number of rows to scan.
thread_local! {pub(crate) static FETCH_ROWS: Cell<Option<usize>> = Cell::new(None)}
Expand Down Expand Up @@ -234,6 +235,7 @@ pub enum LogicalPlan {
schema: SchemaRef,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>
},
/// Join operation
Join {
Expand Down Expand Up @@ -1053,6 +1055,7 @@ impl LogicalPlanBuilder {
schema: Arc::new(schema),
apply,
maintain_order,
dynamic_options: None
}
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl PredicatePushDown {
schema,
apply,
maintain_order,
dynamic_options
} => {
self.pushdown_and_assign(input, optimizer::init_hashmap(), lp_arena, expr_arena)?;

Expand All @@ -304,6 +305,7 @@ impl PredicatePushDown {
schema,
apply,
maintain_order,
dynamic_options
};
Ok(self.finish_at_leaf(lp, acc_predicates, lp_arena, expr_arena))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ impl ProjectionPushDown {
apply,
schema,
maintain_order,
dynamic_options
} => {
// the custom function may need all columns so we do the projections here.
if let Some(f) = apply {
Expand All @@ -575,6 +576,7 @@ impl ProjectionPushDown {
schema,
apply: Some(f),
maintain_order,
dynamic_options
};
let input = lp_arena.add(lp);

Expand All @@ -599,6 +601,12 @@ impl ProjectionPushDown {
add_expr_to_accumulated(*key, &mut acc_projections, &mut names, expr_arena);
}

// make sure that the dynamic key is projected
if let Some(options) = &dynamic_options {
let node = expr_arena.add(AExpr::Column(Arc::from(options.time_column.as_str())));
add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);
}

self.pushdown_and_assign(
input,
acc_projections,
Expand Down
50 changes: 50 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use super::*;
use crate::logical_plan::Context;
use crate::prelude::utils::as_aggregated;
use crate::utils::rename_aexpr_root_name;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use rayon::prelude::*;
use polars_core::frame::groupby::DynamicGroupOptions;

pub struct GroupByDynamicExec {
input: Box<dyn Executor>,
keys: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Arc<dyn PhysicalExpr>>,
options: DynamicGroupOptions
}

impl Executor for GroupByDynamicExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;

let (df, groups) = df.groupby_dynamic(&self.options)?;
let key = df.column(&self.options.time_column)?.clone();

let agg_columns = POOL.install(|| {
self.aggs
.par_iter()
.map(|expr| {
let opt_agg = as_aggregated(expr.as_ref(), &df, &groups, state)?;
if let Some(agg) = &opt_agg {
if agg.len() != groups.len() {
return Err(PolarsError::ComputeError(
format!("returned aggregation is a different length: {} than the group lengths: {}",
agg.len(),
groups.len()).into()
))
}
};
Ok(opt_agg)
})
.collect::<Result<Vec<_>>>()

})?;

let mut columns= Vec::with_capacity(agg_columns.len() + 1);
columns.push(key);
columns.extend(agg_columns.into_iter().flatten());

DataFrame::new(columns)
}
}
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub(crate) mod sort;
pub(crate) mod stack;
pub(crate) mod udf;
pub(crate) mod union;
mod groupby_dynamic;

use super::*;
use crate::logical_plan::FETCH_ROWS;
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ impl DefaultPlanner {
apply,
schema: _,
maintain_order,
dynamic_options
} => {
#[cfg(feature = "object")]
let input_schema = lp_arena.get(input).schema(lp_arena).clone();
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-time/src/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::unit::TimeNanoseconds;
use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use std::ops::{Add, Mul};

#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
pub struct Duration {
// the number of months for the duration
months: i64,
Expand Down
10 changes: 7 additions & 3 deletions polars/polars-time/src/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::calendar::timestamp_ns_to_datetime;
use crate::duration::Duration;
use crate::window::Window;

pub fn groupby(window: Window, time: &[i64]) -> Vec<Vec<u32>> {
pub type GroupTuples = Vec<(u32, Vec<u32>)>;

pub fn groupby(window: Window, time: &[i64]) -> GroupTuples {
let mut boundary = Bounds::from(time);

let mut group_tuples = Vec::with_capacity(window.estimate_overlapping_bounds(boundary));
Expand Down Expand Up @@ -38,7 +40,9 @@ pub fn groupby(window: Window, time: &[i64]) -> Vec<Vec<u32>> {
}
i += 1
}
group_tuples.push(group)
if !group.is_empty() {
group_tuples.push((group[0], group))
}
}
group_tuples
}
Expand Down Expand Up @@ -87,7 +91,7 @@ mod test {
Duration::from_seconds(30),
Duration::from_seconds(0),
);
let gt = groupby(window, &ts);
let gt = groupby(window, &ts).into_iter().map(|g| g.1).collect::<Vec<_>>();

let expected = &[[0, 1, 2], [2, 3, 4], [4, 5, 6]];
assert_eq!(gt, expected);
Expand Down
7 changes: 6 additions & 1 deletion polars/polars-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
mod bounds;
mod calendar;
mod duration;
mod groupby;
pub mod groupby;
#[cfg(test)]
mod test;
mod unit;
mod window;

pub use {
duration::Duration,
window::Window
};

0 comments on commit de91c14

Please sign in to comment.