Skip to content

Commit

Permalink
proper window function cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 5, 2022
1 parent 0d19221 commit 82a2c65
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 68 deletions.
2 changes: 0 additions & 2 deletions polars/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@
//! }
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(all(feature = "datafusion", feature = "compile"))]
mod datafusion;
#[cfg(all(feature = "dot_diagram", feature = "compile"))]
mod dot;
#[cfg(feature = "compile")]
Expand Down
99 changes: 92 additions & 7 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,108 @@ fn set_n_rows(n_rows: Option<usize>) -> Option<usize> {
}
}

pub(crate) fn evaluate_physical_expressions(
fn execute_projection_cached_window_fns(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> Result<DataFrame> {
let zero_length = df.height() == 0;
) -> Result<Vec<Series>> {
// We partition by normal expression and window expression
// - the normal expressions can run in parallel
// - the window expression take more memory and often use the same groupby keys and join tuples
// so they are cached and run sequential

// the partitioning messes with column order, so we also store the idx
// and use those to restore the original projection order
#[allow(clippy::type_complexity)]
let mut windows: Vec<(String, Vec<(u32, Arc<dyn PhysicalExpr>)>)> = vec![];
let mut other = Vec::with_capacity(exprs.len());

// first we partition the window function by the values they group over.
// the groupby values should be cached
let mut index = 0u32;
exprs.iter().for_each(|phys| {
index += 1;
let e = phys.as_expression();

let mut is_window = false;
for e in e.into_iter() {
if let Expr::Window { partition_by, .. } = e {
let groupby = format!("{:?}", partition_by.as_slice());
// *windows.entry(groupby).or_insert(0) += 1;
if let Some(tpl) = windows.iter_mut().find(|tpl| tpl.0 == groupby) {
tpl.1.push((index, phys.clone()))
} else {
windows.push((groupby, vec![(index, phys.clone())]))
}
is_window = true;
break;
}
}
if !is_window {
other.push((index, phys))
}
});

let mut selected_columns = POOL.install(|| {
exprs
other
.par_iter()
.map(|expr| expr.evaluate(df, state))
.collect::<Result<Vec<Series>>>()
.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))
.collect::<Result<Vec<_>>>()
})?;

for partition in windows {
// clear the cache for every partitioned group
let mut state = state.clone();
state.clear_expr_cache();

// don't bother caching if we only have a single window function in this partition
if partition.1.len() == 1 {
state.cache_window = false;
} else {
state.cache_window = true;
}

for (index, e) in partition.1 {
let s = e.evaluate(df, &state)?;
selected_columns.push((index, s));
}
}

selected_columns.sort_unstable_by_key(|tpl| tpl.0);
let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();
Ok(selected_columns)
}

pub(crate) fn evaluate_physical_expressions(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
has_windows: bool,
) -> Result<DataFrame> {
let zero_length = df.height() == 0;
let selected_columns = if has_windows {
execute_projection_cached_window_fns(df, exprs, state)?
} else {
POOL.install(|| {
exprs
.par_iter()
.map(|expr| expr.evaluate(df, state))
.collect::<Result<_>>()
})?
};

check_expand_literals(selected_columns, zero_length)
}

fn check_expand_literals(
mut selected_columns: Vec<Series>,
zero_length: bool,
) -> Result<DataFrame> {
let first_len = selected_columns[0].len();
let mut df_height = 0;
let mut all_equal_len = true;
{
let mut names = PlHashSet::with_capacity(exprs.len());
let mut names = PlHashSet::with_capacity(selected_columns.len());
for s in &selected_columns {
let len = s.len();
df_height = std::cmp::max(df_height, len);
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars_core::prelude::*;
pub struct ProjectionExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
#[cfg(test)]
pub(crate) schema: SchemaRef,
}
Expand All @@ -16,7 +17,7 @@ impl Executor for ProjectionExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;

let df = evaluate_physical_expressions(&df, &self.expr, state);
let df = evaluate_physical_expressions(&df, &self.expr, state, self.has_windows);

// this only runs during testing and check if the runtime type matches the predicted schema
#[cfg(test)]
Expand Down
23 changes: 5 additions & 18 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,23 +249,10 @@ impl Executor for CsvExec {

/// Producer of an in memory DataFrame
pub struct DataFrameExec {
df: Arc<DataFrame>,
projection: Option<Vec<Arc<dyn PhysicalExpr>>>,
selection: Option<Arc<dyn PhysicalExpr>>,
}

impl DataFrameExec {
pub(crate) fn new(
df: Arc<DataFrame>,
projection: Option<Vec<Arc<dyn PhysicalExpr>>>,
selection: Option<Arc<dyn PhysicalExpr>>,
) -> Self {
DataFrameExec {
df,
projection,
selection,
}
}
pub(crate) df: Arc<DataFrame>,
pub(crate) projection: Option<Vec<Arc<dyn PhysicalExpr>>>,
pub(crate) selection: Option<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
}

impl Executor for DataFrameExec {
Expand All @@ -276,7 +263,7 @@ impl Executor for DataFrameExec {
// projection should be before selection as those are free
// TODO: this is only the case if we don't create new columns
if let Some(projection) = &self.projection {
df = evaluate_physical_expressions(&df, projection, state)?;
df = evaluate_physical_expressions(&df, projection, state, self.has_windows)?;
}

if let Some(selection) = &self.selection {
Expand Down
44 changes: 22 additions & 22 deletions polars/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
use crate::physical_plan::executors::execute_projection_cached_window_fns;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::{prelude::*, POOL};
use rayon::prelude::*;

pub struct StackExec {
input: Box<dyn Executor>,
expr: Vec<Arc<dyn PhysicalExpr>>,
}

impl StackExec {
pub(crate) fn new(input: Box<dyn Executor>, expr: Vec<Arc<dyn PhysicalExpr>>) -> Self {
Self { input, expr }
}
pub(crate) input: Box<dyn Executor>,
pub(crate) has_windows: bool,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
}

impl Executor for StackExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;
let height = df.height();

let res = POOL.install(|| {
self.expr
.par_iter()
.map(|expr| {
expr.evaluate(&df, state).map(|series| {
// literal series. Should be whole column size
if series.len() == 1 && height > 1 {
series.expand_at_index(0, height)
} else {
series
}
let res = if self.has_windows {
execute_projection_cached_window_fns(&df, &self.expr, state)?
} else {
POOL.install(|| {
self.expr
.par_iter()
.map(|expr| {
expr.evaluate(&df, state).map(|series| {
// literal series. Should be whole column size
if series.len() == 1 && height > 1 {
series.expand_at_index(0, height)
} else {
series
}
})
})
})
.collect::<Result<Vec<_>>>()
})?;
.collect::<Result<Vec<_>>>()
})?
};

for s in res {
df.with_column(s)?;
Expand Down
82 changes: 68 additions & 14 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct WindowExpr {
pub(crate) function: Expr,
pub(crate) phys_function: Arc<dyn PhysicalExpr>,
pub(crate) options: WindowOptions,
pub(crate) expr: Expr,
}

impl PhysicalExpr for WindowExpr {
Expand All @@ -37,17 +38,40 @@ impl PhysicalExpr for WindowExpr {
.map(|e| e.evaluate(df, state))
.collect::<Result<Vec<_>>>()?;

let mut gb = df.groupby_with_series(groupby_columns.clone(), true)?;
let mut groups = std::mem::take(gb.get_groups_mut());
let create_groups = || {
let mut gb = df.groupby_with_series(groupby_columns.clone(), true)?;
let out: Result<GroupTuples> = Ok(std::mem::take(gb.get_groups_mut()));
out
};

// Try to get cached grouptuples
let (mut groups, cached, cache_key) = if state.cache_window {
let mut cache_key = String::with_capacity(32 * groupby_columns.len());
for s in &groupby_columns {
cache_key.push_str(s.name());
}

let mut gt_map = state.group_tuples.lock().unwrap();
// we run sequential and partitioned
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(gt_map.len() <= 1);
if let Some(gt) = gt_map.get_mut(&cache_key) {
(std::mem::take(gt), true, cache_key)
} else {
(create_groups()?, false, cache_key)
}
} else {
(create_groups()?, false, "".to_string())
};

// if we flatten this column we need to make sure the groups are sorted.
if self.options.explode {
if !cached && self.options.explode {
groups.sort_unstable_by_key(|t| t.0);
}

// 2. create GroupBy object and apply aggregation
let apply_columns = self.apply_columns.iter().map(|s| s.as_ref()).collect();
let gb = GroupBy::new(df, groupby_columns.clone(), groups, Some(apply_columns));
let mut gb = GroupBy::new(df, groupby_columns.clone(), groups, Some(apply_columns));

let out = match self.phys_function.as_agg_expr() {
// this branch catches all aggregation expressions
Expand Down Expand Up @@ -75,8 +99,14 @@ impl PhysicalExpr for WindowExpr {
Ok(DataFrame::new_no_checks(cols))
}
}?;
// drop the group tuples before we do the left join to reduce allocated memory.
drop(gb);
if state.cache_window {
let groups = std::mem::take(gb.get_groups_mut());
let mut gt_map = state.group_tuples.lock().unwrap();
gt_map.insert(cache_key.clone(), groups);
} else {
// drop the group tuples before we do the left join to reduce allocated memory.
drop(gb);
}

// 3. get the join tuples and use them to take the new Series
let out_column = out.select_at_idx(out.width() - 1).unwrap();
Expand All @@ -88,14 +118,32 @@ impl PhysicalExpr for WindowExpr {
return Ok(out);
}

let opt_join_tuples = if groupby_columns.len() == 1 {
// group key from right column
let right = out.select_at_idx(0).unwrap();
groupby_columns[0].hash_join_left(right)
let get_join_tuples = || {
if groupby_columns.len() == 1 {
// group key from right column
let right = out.select_at_idx(0).unwrap();
groupby_columns[0].hash_join_left(right)
} else {
let df_right =
DataFrame::new_no_checks(out.get_columns()[..out.width() - 1].to_vec());
let df_left = DataFrame::new_no_checks(groupby_columns);
private_left_join_multiple_keys(&df_left, &df_right)
}
};

// try to get cached join_tuples
let opt_join_tuples = if state.cache_window {
let mut jt_map = state.join_tuples.lock().unwrap();
// we run sequential and partitioned
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(jt_map.len() <= 1);
if let Some(opt_join_tuples) = jt_map.get_mut(&cache_key) {
std::mem::take(opt_join_tuples)
} else {
get_join_tuples()
}
} else {
let df_right = DataFrame::new_no_checks(out.get_columns()[..out.width() - 1].to_vec());
let df_left = DataFrame::new_no_checks(groupby_columns);
private_left_join_multiple_keys(&df_left, &df_right)
get_join_tuples()
};

let mut iter = opt_join_tuples
Expand All @@ -106,6 +154,12 @@ impl PhysicalExpr for WindowExpr {
if let Some(name) = &self.out_name {
out.rename(name.as_ref());
}

if state.cache_window {
let mut jt_map = state.join_tuples.lock().unwrap();
jt_map.insert(cache_key, opt_join_tuples);
}

Ok(out)
}

Expand All @@ -126,6 +180,6 @@ impl PhysicalExpr for WindowExpr {
}

fn as_expression(&self) -> &Expr {
todo!()
&self.expr
}
}

0 comments on commit 82a2c65

Please sign in to comment.