Skip to content

Commit

Permalink
Pushdown SLICE to GROUPBY nodes (#3138)
Browse files Browse the repository at this point in the history
* groupby accept slice

* groupby slice pushodwn
  • Loading branch information
ritchie46 committed Apr 13, 2022
1 parent e51ba24 commit f164d7b
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 16 deletions.
19 changes: 17 additions & 2 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,23 +307,38 @@ impl<'df> GroupBy<'df> {
self.groups
}

pub fn keys(&self) -> Vec<Series> {
pub fn keys_sliced(&self, slice: Option<(i64, usize)>) -> Vec<Series> {
POOL.install(|| {
self.selected_keys
.par_iter()
.map(|s| {
#[allow(unused_assignments)]
// needed to keep the lifetimes valid for this scope
let mut groups_owned = None;

let groups = if let Some((offset, len)) = slice {
groups_owned = Some(self.groups.slice(offset, len));
groups_owned.as_deref().unwrap()
} else {
&self.groups
};

// Safety
// groupby indexes are in bound.
unsafe {
s.take_iter_unchecked(
&mut self.groups.idx_ref().iter().map(|(idx, _)| idx as usize),
&mut groups.idx_ref().iter().map(|(idx, _)| idx as usize),
)
}
})
.collect()
})
}

pub fn keys(&self) -> Vec<Series> {
self.keys_sliced(None)
}

fn prepare_agg(&self) -> Result<(Vec<Series>, Vec<Series>)> {
let selection = match &self.selected_agg {
Some(selection) => selection.clone(),
Expand Down
65 changes: 64 additions & 1 deletion polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::prelude::*;
use crate::utils::NoNull;
use crate::utils::{slice_slice, NoNull};
use crate::POOL;
use polars_arrow::utils::CustomIterTools;
use rayon::iter::plumbing::UnindexedConsumer;
use rayon::prelude::*;
use std::mem::ManuallyDrop;
use std::ops::Deref;

/// Indexes of the groups, the first index is stored separately.
/// this make sorting fast.
Expand Down Expand Up @@ -43,6 +45,10 @@ impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
}

impl GroupsIdx {
pub fn new(first: Vec<IdxSize>, all: Vec<Vec<IdxSize>>, sorted: bool) -> Self {
Self { sorted, first, all }
}

pub fn sort(&mut self) {
let mut idx = 0;
let first = std::mem::take(&mut self.first);
Expand Down Expand Up @@ -318,6 +324,48 @@ impl GroupsProxy {
.collect_trusted(),
}
}

pub fn slice(&self, offset: i64, len: usize) -> SlicedGroups {
// Safety:
// we create new `Vec`s from the sliced groups. But we wrap them in ManuallyDrop
// so that we never call drop on them.
// These groups lifetimes are bounded to the `self`. This must remain valid
// for the scope of the aggregation.
let sliced = match self {
GroupsProxy::Idx(groups) => {
let first = unsafe {
let first = slice_slice(groups.first(), offset, len);
let ptr = first.as_ptr() as *mut _;
Vec::from_raw_parts(ptr, first.len(), first.len())
};

let all = unsafe {
let all = slice_slice(groups.all(), offset, len);
let ptr = all.as_ptr() as *mut _;
Vec::from_raw_parts(ptr, all.len(), all.len())
};
ManuallyDrop::new(GroupsProxy::Idx(GroupsIdx::new(
first,
all,
groups.is_sorted(),
)))
}
GroupsProxy::Slice(groups) => {
let groups = unsafe {
let groups = slice_slice(groups, offset, len);
let ptr = groups.as_ptr() as *mut _;
Vec::from_raw_parts(ptr, groups.len(), groups.len())
};

ManuallyDrop::new(GroupsProxy::Slice(groups))
}
};

SlicedGroups {
sliced,
borrowed: self,
}
}
}

impl From<GroupsIdx> for GroupsProxy {
Expand Down Expand Up @@ -411,3 +459,18 @@ impl<'a> ParallelIterator for GroupsProxyParIter<'a> {
.drive_unindexed(consumer)
}
}

pub struct SlicedGroups<'a> {
sliced: ManuallyDrop<GroupsProxy>,
#[allow(dead_code)]
// we need the lifetime to ensure the slice remains valid
borrowed: &'a GroupsProxy,
}

impl Deref for SlicedGroups<'_> {
type Target = GroupsProxy;

fn deref(&self) -> &Self::Target {
self.sliced.deref()
}
}
2 changes: 1 addition & 1 deletion polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn split_df(df: &DataFrame, n: usize) -> Result<Vec<DataFrame>> {
split_array!(df, n, i64)
}

pub(crate) fn slice_slice<T>(vals: &[T], offset: i64, len: usize) -> &[T] {
pub fn slice_slice<T>(vals: &[T], offset: i64, len: usize) -> &[T] {
let (raw_offset, slice_len) = slice_offsets(offset, len, vals.len());
&vals[raw_offset..raw_offset + slice_len]
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ impl LogicalPlanBuilder {
options: GroupbyOptions {
dynamic: dynamic_options,
rolling: rolling_options,
slice: None,
},
}
.into()
Expand Down
20 changes: 18 additions & 2 deletions polars/polars-lazy/src/logical_plan/optimizer/slice_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ impl SlicePushDown {
right_on,
options
})
}
(Aggregate { input, keys, aggs, schema, apply, maintain_order, mut options }, Some(state)) => {
// first restart optimization in inputs and get the updated LP
let input_lp = lp_arena.take(input);
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
let input= lp_arena.add(input_lp);

options.slice = Some((state.offset, state.len as usize));

Ok(Aggregate {
input,
keys,
aggs,
schema,
apply,
maintain_order,
options
})

}
(Slice {
Expand Down Expand Up @@ -246,8 +264,6 @@ impl SlicePushDown {
// will lead to incorrect aggregations
| m @ (LocalProjection {..},_)
// other blocking nodes
| m @ (Join { .. }, _)
| m @ (Aggregate {..}, _)
| m @ (DataFrameScan {..}, _)
| m @ (Sort {..}, _)
| m @ (Explode {..}, _)
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct UnionOptions {
pub struct GroupbyOptions {
pub(crate) dynamic: Option<DynamicGroupOptions>,
pub(crate) rolling: Option<RollingGroupOptions>,
pub(crate) slice: Option<(i64, usize)>,
}

#[derive(Clone, Debug)]
Expand Down
57 changes: 49 additions & 8 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct GroupByExec {
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
input_schema: SchemaRef,
slice: Option<(i64, usize)>,
}

impl GroupByExec {
Expand All @@ -23,6 +24,7 @@ impl GroupByExec {
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
input_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
Self {
input,
Expand All @@ -31,6 +33,7 @@ impl GroupByExec {
apply,
maintain_order,
input_schema,
slice,
}
}
}
Expand All @@ -42,6 +45,7 @@ fn groupby_helper(
apply: Option<&Arc<dyn DataFrameUdf>>,
state: &ExecutionState,
maintain_order: bool,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
let gb = df.groupby_with_series(keys, true, maintain_order)?;

Expand All @@ -50,10 +54,19 @@ fn groupby_helper(
return gb.apply(|df| f.call_udf(df));
}

let groups = gb.get_groups();
let mut groups = gb.get_groups();

#[allow(unused_assignments)]
// it is unused because we only use it to keep the lifetime of sliced_group valid
let mut sliced_groups = None;

if let Some((offset, len)) = slice {
sliced_groups = Some(groups.slice(offset, len));
groups = sliced_groups.as_deref().unwrap();
}

let (mut columns, agg_columns) = POOL.install(|| {
let get_columns = || gb.keys();
let get_columns = || gb.keys_sliced(slice);

let get_agg = || aggs
.par_iter()
Expand Down Expand Up @@ -100,6 +113,7 @@ impl Executor for GroupByExec {
self.apply.as_ref(),
state,
self.maintain_order,
self.slice,
)
}
}
Expand All @@ -111,6 +125,7 @@ pub struct PartitionGroupByExec {
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Expr>,
maintain_order: bool,
slice: Option<(i64, usize)>,
}

impl PartitionGroupByExec {
Expand All @@ -120,13 +135,15 @@ impl PartitionGroupByExec {
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Expr>,
maintain_order: bool,
slice: Option<(i64, usize)>,
) -> Self {
Self {
input,
key,
phys_aggs,
aggs,
maintain_order,
slice,
}
}
}
Expand Down Expand Up @@ -231,7 +248,15 @@ impl Executor for PartitionGroupByExec {
if state.verbose {
eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")
}
return groupby_helper(original_df, vec![key], &self.phys_aggs, None, state, false);
return groupby_helper(
original_df,
vec![key],
&self.phys_aggs,
None,
state,
false,
None,
);
}

// 0.5% is approximately the tipping point
Expand Down Expand Up @@ -281,7 +306,15 @@ impl Executor for PartitionGroupByExec {
(cardinality_frac * 100.0) as u32
);
}
return groupby_helper(original_df, vec![key], &self.phys_aggs, None, state, false);
return groupby_helper(
original_df,
vec![key],
&self.phys_aggs,
None,
state,
false,
None,
);
}
if state.verbose {
eprintln!("run PARTITIONED HASH AGGREGATION")
Expand All @@ -299,11 +332,20 @@ impl Executor for PartitionGroupByExec {

// first get mutable access and optionally sort
let gb = df.groupby_with_series(vec![key], true, self.maintain_order)?;
let groups = gb.get_groups();
let mut groups = gb.get_groups();

#[allow(unused_assignments)]
// it is unused because we only use it to keep the lifetime of sliced_group valid
let mut sliced_groups = None;

if let Some((offset, len)) = self.slice {
sliced_groups = Some(groups.slice(offset, len));
groups = sliced_groups.as_deref().unwrap();
}

let (aggs_and_names, outer_phys_aggs) = get_outer_agg_exprs(self, &original_df)?;

let get_columns = || gb.keys();
let get_columns = || gb.keys_sliced(self.slice);
let get_agg = || {
outer_phys_aggs
.par_iter()
Expand All @@ -329,7 +371,6 @@ impl Executor for PartitionGroupByExec {
columns.extend(agg_columns);
state.clear_schema_cache();

let df = DataFrame::new_no_checks(columns);
Ok(df)
Ok(DataFrame::new_no_checks(columns))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) struct GroupByDynamicExec {
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) options: DynamicGroupOptions,
pub(crate) input_schema: SchemaRef,
pub(crate) slice: Option<(i64, usize)>,
}

impl Executor for GroupByDynamicExec {
Expand All @@ -29,11 +30,21 @@ impl Executor for GroupByDynamicExec {

let (time_key, keys, groups) = df.groupby_dynamic(keys, &self.options)?;

let mut groups = &groups;
#[allow(unused_assignments)]
// it is unused because we only use it to keep the lifetime of sliced_group valid
let mut sliced_groups = None;

if let Some((offset, len)) = self.slice {
sliced_groups = Some(groups.slice(offset, len));
groups = sliced_groups.as_deref().unwrap();
}

let agg_columns = POOL.install(|| {
self.aggs
.par_iter()
.map(|expr| {
let opt_agg = as_aggregated(expr.as_ref(), &df, &groups, state)?;
let opt_agg = as_aggregated(expr.as_ref(), &df, groups, state)?;
if let Some(agg) = &opt_agg {
if agg.len() != groups.len() {
return Err(PolarsError::ComputeError(
Expand Down

0 comments on commit f164d7b

Please sign in to comment.