Skip to content

Commit

Permalink
more dypatch of dynamic groupby
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2021
1 parent 4dfb67d commit 16a254f
Show file tree
Hide file tree
Showing 22 changed files with 211 additions and 76 deletions.
1 change: 1 addition & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ arange = ["polars-lazy/arange"]
true_div = ["polars-lazy/true_div"]
diagonal_concat = ["polars-core/diagonal_concat"]
abs = ["polars-core/abs", "polars-lazy/abs"]
dynamic_groupby = ["polars-core/dynamic_groupby", "polars-lazy/dynamic_groupby"]

# don't use this
private = ["polars-lazy/private"]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ moment = []
diagonal_concat = []
abs = []

dynamic_groupby = []
dynamic_groupby = ["polars-time"]

# opt-in datatypes for Series
dtype-date = ["temporal"]
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-core/src/chunked_array/temporal/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ where
"%Y-%m-%d",
// 31-12-2021
"%d-%m-%Y",
// 21/12/31 12:54:98
"%y/%m/%d %H:%M:%S",
// 2021-12-31 24:58:01
"%y-%m-%d %H:%M:%S",
// 21/12/31 24:58:01
"%y/%m/%d %H:%M:%S",
//210319 23:58:50
"%y%m%d %H:%M:%S",
// 2019-04-18T02:45:55
// 2021/12/31 12:54:98
"%Y/%m/%d %H:%M:%S",
// 2021-12-31 24:58:01
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,9 @@ Series: 'Date' [date]
r#"shape: (3,)
Series: '' [datetime]
[
1970-01-01 00:00:00.001
1970-01-01 00:00:00.000000001
null
2001-09-09 01:46:40
1970-01-01 00:16:40
]"#,
format!("{:?}", s.into_series())
);
Expand Down
23 changes: 13 additions & 10 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ use crate::frame::groupby::{
pub struct DynamicGroupOptions {
pub time_column: String,
/// start a window at this interval
every: Duration,
pub every: Duration,
/// window duration
period: Duration,
pub period: Duration,
/// offset window boundaries
offset: Duration,
pub offset: Duration,
/// truncate the time column values to the window
truncate: bool
pub truncate: bool
}

impl DataFrame {
pub fn groupby_dynamic(&self, options: &DynamicGroupOptions) -> Result<(Self, GroupTuples)> {
pub fn groupby_dynamic(&self, options: &DynamicGroupOptions) -> Result<(Series, GroupTuples)> {
let w = Window::new(options.every, options.period, options.offset);


Expand All @@ -39,14 +39,17 @@ impl DataFrame {
polars_time::groupby::groupby(w, ts)
}).flatten().collect::<Vec<_>>();

let mut df = self.clone();
// Safety:
// within bounds
let mut dt = unsafe {
dt.take_unchecked(gt.iter().map(|g| g.0 as usize).into())
};

if options.truncate {
let out = dt.apply(|v| w.truncate(v));
let out = out.cast(&DataType::Datetime).unwrap();
df.with_column(out)?;
dt = dt.apply(|v| w.truncate(v));
}


Ok((df, gt))
Ok((dt.into_date().into_series(), gt))
}
}
9 changes: 9 additions & 0 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ pub mod aggregations;
pub(crate) mod hashing;
#[cfg(feature = "pivot")]
pub(crate) mod pivot;
#[cfg(feature = "dynamic_groupby")]
mod dynamic;
#[cfg(not(feature = "dynamic_groupby"))]
#[derive(Clone, Debug)]
pub struct DynamicGroupOptions {
pub time_column: String
}


pub type GroupTuples = Vec<(u32, Vec<u32>)>;
pub type GroupedMap<T> = HashMap<T, Vec<u32>, RandomState>;

#[cfg(feature = "dynamic_groupby")]
pub use dynamic::*;

/// Used to create the tuples for a groupby operation.
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ pub use crate::chunked_array::ops::unique::rank::{RankMethod, RankOptions};

#[cfg(feature = "rolling_window")]
pub use crate::chunked_array::ops::rolling_window::RollingOptions;

#[cfg(feature = "dynamic_groupby")]
pub use polars_time::Duration;
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ moment = ["polars-core/moment"]
list = ["polars-core/list"]
abs = ["polars-core/abs"]
random = ["polars-core/random"]
dynamic_groupby = ["polars-core/dynamic_groupby"]

# no guarantees whatsoever
private = []
Expand Down
19 changes: 18 additions & 1 deletion polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use polars_core::prelude::*;
#[cfg(feature = "dtype-categorical")]
use polars_core::toggle_string_cache;
use std::sync::Arc;
use polars_core::frame::groupby::DynamicGroupOptions;

use crate::logical_plan::optimizer::aggregate_pushdown::AggregatePushdown;
#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
Expand Down Expand Up @@ -817,9 +818,22 @@ impl LazyFrame {
opt_state,
keys: by.as_ref().to_vec(),
maintain_order: false,
dynamic_options: None
}
}

pub fn groupby_dynamic<E: AsRef<[Expr]>>(self, by: E, options: DynamicGroupOptions) -> LazyGroupBy {
let opt_state = self.get_opt_state();
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys: by.as_ref().to_vec(),
maintain_order: true,
dynamic_options: Some(options)
}

}

/// Similar to groupby, but order of the DataFrame is maintained.
pub fn groupby_stable<E: AsRef<[Expr]>>(self, by: E) -> LazyGroupBy {
let opt_state = self.get_opt_state();
Expand All @@ -828,6 +842,7 @@ impl LazyFrame {
opt_state,
keys: by.as_ref().to_vec(),
maintain_order: true,
dynamic_options: None
}
}

Expand Down Expand Up @@ -1126,6 +1141,7 @@ pub struct LazyGroupBy {
opt_state: OptState,
keys: Vec<Expr>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>,
}

impl LazyGroupBy {
Expand Down Expand Up @@ -1153,7 +1169,7 @@ impl LazyGroupBy {
/// ```
pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
let lp = LogicalPlanBuilder::from(self.logical_plan)
.groupby(Arc::new(self.keys), aggs, None, self.maintain_order)
.groupby(Arc::new(self.keys), aggs, None, self.maintain_order, self.dynamic_options)
.build();
LazyFrame::from_logical_plan(lp, self.opt_state)
}
Expand Down Expand Up @@ -1196,6 +1212,7 @@ impl LazyGroupBy {
vec![],
Some(Arc::new(f)),
self.maintain_order,
None
)
.build();
LazyFrame::from_logical_plan(lp, self.opt_state)
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,9 @@ impl<'a> ALogicalPlanBuilder<'a> {
aggs: Vec<Node>,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>
) -> Self {
debug_assert!(!keys.is_empty());
debug_assert!(!(keys.is_empty() && dynamic_options.is_none()));
let current_schema = self.schema();
// TODO! add this line if LogicalPlan is dropped in favor of ALogicalPlan
// let aggs = rewrite_projections(aggs, current_schema);
Expand All @@ -735,7 +736,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
schema: Arc::new(schema),
apply,
maintain_order,
dynamic_options: None
dynamic_options,
};
let root = self.lp_arena.add(lp);
Self::new(root, self.expr_arena, self.lp_arena)
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,8 +1039,9 @@ impl LogicalPlanBuilder {
aggs: E,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
dynamic_options: Option<DynamicGroupOptions>
) -> Self {
debug_assert!(!keys.is_empty());
debug_assert!(!(keys.is_empty() && dynamic_options.is_none()));
let current_schema = self.0.schema();
let aggs = rewrite_projections(aggs.as_ref().to_vec(), current_schema);

Expand All @@ -1055,7 +1056,7 @@ impl LogicalPlanBuilder {
schema: Arc::new(schema),
apply,
maintain_order,
dynamic_options: None
dynamic_options
}
.into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn combine_lp_nodes(
.map(|input| {
let node = lp_arena.add(input);
ALogicalPlanBuilder::new(node, expr_arena, lp_arena)
.groupby(keys, aggs, None, maintain_order)
.groupby(keys, aggs, None, maintain_order, None)
.build()

})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ impl ProjectionPushDown {
aggs,
apply,
maintain_order,
dynamic_options
);
Ok(builder.build())
}
Expand Down
69 changes: 39 additions & 30 deletions polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,53 @@ use polars_core::POOL;
use rayon::prelude::*;
use polars_core::frame::groupby::DynamicGroupOptions;

pub struct GroupByDynamicExec {
input: Box<dyn Executor>,
keys: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Arc<dyn PhysicalExpr>>,
options: DynamicGroupOptions
pub(crate) struct GroupByDynamicExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) options: DynamicGroupOptions
}

impl Executor for GroupByDynamicExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
#[cfg(feature = "dynamic_groupby")]
{
let df = self.input.execute(state)?;

let (df, groups) = df.groupby_dynamic(&self.options)?;
let key = df.column(&self.options.time_column)?.clone();
// don't truncate all values
// we truncate when we have collected the keys
let mut options = self.options.clone();;
options.truncate = false;
let (key, groups) = df.groupby_dynamic(&options)?;

let agg_columns = POOL.install(|| {
self.aggs
.par_iter()
.map(|expr| {
let opt_agg = as_aggregated(expr.as_ref(), &df, &groups, state)?;
if let Some(agg) = &opt_agg {
if agg.len() != groups.len() {
return Err(PolarsError::ComputeError(
format!("returned aggregation is a different length: {} than the group lengths: {}",
agg.len(),
groups.len()).into()
))
}
};
Ok(opt_agg)
})
.collect::<Result<Vec<_>>>()
let agg_columns = POOL.install(|| {
self.aggs
.par_iter()
.map(|expr| {
let opt_agg = as_aggregated(expr.as_ref(), &df, &groups, state)?;
if let Some(agg) = &opt_agg {
if agg.len() != groups.len() {
return Err(PolarsError::ComputeError(
format!("returned aggregation is a different length: {} than the group lengths: {}",
agg.len(),
groups.len()).into()
))
}
};
Ok(opt_agg)
})
.collect::<Result<Vec<_>>>()
})?;

})?;
let mut columns = Vec::with_capacity(agg_columns.len() + 1);
columns.push(key);
columns.extend(agg_columns.into_iter().flatten());

let mut columns= Vec::with_capacity(agg_columns.len() + 1);
columns.push(key);
columns.extend(agg_columns.into_iter().flatten());
dbg!(&columns);

DataFrame::new(columns)
DataFrame::new(columns)
}
#[cfg(not(feature = "dynamic_groupby"))]
panic!("activate feature dynamic_groupby")
}
}
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub(crate) mod sort;
pub(crate) mod stack;
pub(crate) mod udf;
pub(crate) mod union;
mod groupby_dynamic;
pub(crate) mod groupby_dynamic;

use super::*;
use crate::logical_plan::FETCH_ROWS;
Expand Down
21 changes: 16 additions & 5 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use polars_core::{frame::groupby::GroupByMethod, utils::parallel_op_series};
use polars_io::ScanAggregation;
use std::collections::HashSet;
use std::sync::Arc;
use crate::physical_plan::executors::groupby_dynamic::GroupByDynamicExec;

#[cfg(any(feature = "parquet", feature = "csv-file"))]
fn aggregate_expr_to_scan_agg(
Expand Down Expand Up @@ -304,6 +305,21 @@ impl DefaultPlanner {
let input_schema = lp_arena.get(input).schema(lp_arena).clone();
let input = self.create_physical_plan(input, lp_arena, expr_arena)?;

let mut phys_keys =
self.create_physical_expressions(&keys, Context::Default, expr_arena)?;

let phys_aggs =
self.create_physical_expressions(&aggs, Context::Aggregation, expr_arena)?;

if let Some(options) = dynamic_options {
return Ok(Box::new(GroupByDynamicExec {
input,
keys: phys_keys,
aggs: phys_aggs,
options
}) )
}

// We first check if we can partition the groupby on the latest moment.
// TODO: fix this brittle/ buggy state and implement partitioned groupby's in eager
let mut partitionable = true;
Expand Down Expand Up @@ -372,11 +388,6 @@ impl DefaultPlanner {
} else {
partitionable = false;
}
let mut phys_keys =
self.create_physical_expressions(&keys, Context::Default, expr_arena)?;

let phys_aggs =
self.create_physical_expressions(&aggs, Context::Aggregation, expr_arena)?;
if partitionable {
Ok(Box::new(PartitionGroupByExec::new(
input,
Expand Down

0 comments on commit 16a254f

Please sign in to comment.