Skip to content

Commit

Permalink
improve performance of aggregations by not traversing unused memory
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 3, 2022
1 parent feab227 commit a8a1938
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 62 deletions.
123 changes: 62 additions & 61 deletions polars/polars-core/src/frame/groupby/aggregations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ where
ca.slice(first as i64, len as usize)
}

// helper that combines the groups into a parallel iterator over `(first, all): (u32, &Vec<u32>)`
fn agg_helper_idx<T, F>(groups: &GroupsIdx, f: F) -> Option<Series>
where
F: Fn((u32, &Vec<u32>)) -> Option<T::Native> + Send + Sync,
Expand All @@ -33,6 +34,18 @@ where
Some(ca.into_series())
}

// helper that iterates on the `all: Vec<Vec<u32>` collection
// this doesn't have traverse the `first: Vec<u32>` memory and is therefore faster
fn agg_helper_idx_on_all<T, F>(groups: &GroupsIdx, f: F) -> Option<Series>
where
F: Fn(&Vec<u32>) -> Option<T::Native> + Send + Sync,
T: PolarsNumericType,
ChunkedArray<T>: IntoSeries,
{
let ca: ChunkedArray<T> = POOL.install(|| groups.all().into_par_iter().map(f).collect());
Some(ca.into_series())
}

fn agg_helper_slice<T, F>(groups: &[[u32; 2]], f: F) -> Option<Series>
where
F: Fn([u32; 2]) -> Option<T::Native> + Send + Sync,
Expand Down Expand Up @@ -72,7 +85,7 @@ impl Series {
#[cfg(feature = "private")]
pub fn agg_valid_count(&self, groups: &GroupsProxy) -> Option<Series> {
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx::<UInt32Type, _>(groups, |(_first, idx)| {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<UInt32Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
None
Expand Down Expand Up @@ -137,7 +150,7 @@ impl Series {
#[cfg(feature = "private")]
pub fn agg_n_unique(&self, groups: &GroupsProxy) -> Option<Series> {
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx::<UInt32Type, _>(groups, |(_first, idx)| {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<UInt32Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
None
Expand Down Expand Up @@ -165,7 +178,7 @@ impl Series {
pub fn agg_last(&self, groups: &GroupsProxy) -> Series {
let out = match groups {
GroupsProxy::Idx(groups) => {
let mut iter = groups.iter().map(|(_, idx)| {
let mut iter = groups.all().iter().map(|idx| {
if idx.is_empty() {
None
} else {
Expand Down Expand Up @@ -428,7 +441,7 @@ where
pub(crate) fn agg_var(&self, groups: &GroupsProxy) -> Option<Series> {
let ca = &self.0;
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx::<T, _>(groups, |(_first, idx)| {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
Expand All @@ -452,7 +465,7 @@ where
pub(crate) fn agg_std(&self, groups: &GroupsProxy) -> Option<Series> {
let ca = &self.0;
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx::<T, _>(groups, |(_first, idx)| {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
Expand Down Expand Up @@ -483,7 +496,7 @@ where
let ca = &self.0;
let invalid_quantile = !(0.0..=1.0).contains(&quantile);
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx::<T, _>(groups, |(_first, idx)| {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() | invalid_quantile {
return None;
Expand Down Expand Up @@ -515,7 +528,7 @@ where
pub(crate) fn agg_median(&self, groups: &GroupsProxy) -> Option<Series> {
let ca = &self.0;
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx::<T, _>(groups, |(_first, idx)| {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<T, _>(groups, |idx| {
debug_assert!(idx.len() <= ca.len());
if idx.is_empty() {
return None;
Expand Down Expand Up @@ -616,17 +629,14 @@ where

pub(crate) fn agg_var(&self, groups: &GroupsProxy) -> Option<Series> {
match groups {
GroupsProxy::Idx(groups) => {
agg_helper_idx::<Float64Type, _>(groups, |(_first, idx)| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take =
unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.var_as_series().unpack::<Float64Type>().unwrap().get(0)
})
}
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take = unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.var_as_series().unpack::<Float64Type>().unwrap().get(0)
}),
GroupsProxy::Slice(groups) => {
agg_helper_slice::<Float64Type, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as u32);
Expand All @@ -644,17 +654,14 @@ where
}
pub(crate) fn agg_std(&self, groups: &GroupsProxy) -> Option<Series> {
match groups {
GroupsProxy::Idx(groups) => {
agg_helper_idx::<Float64Type, _>(groups, |(_first, idx)| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take =
unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.std_as_series().unpack::<Float64Type>().unwrap().get(0)
})
}
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take = unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.std_as_series().unpack::<Float64Type>().unwrap().get(0)
}),
GroupsProxy::Slice(groups) => {
agg_helper_slice::<Float64Type, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as u32);
Expand All @@ -678,21 +685,18 @@ where
interpol: QuantileInterpolOptions,
) -> Option<Series> {
match groups {
GroupsProxy::Idx(groups) => {
agg_helper_idx::<Float64Type, _>(groups, |(_first, idx)| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take =
unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.quantile_as_series(quantile, interpol)
.unwrap()
.unpack::<Float64Type>()
.unwrap()
.get(0)
})
}
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take = unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.quantile_as_series(quantile, interpol)
.unwrap()
.unpack::<Float64Type>()
.unwrap()
.get(0)
}),
GroupsProxy::Slice(groups) => {
agg_helper_slice::<Float64Type, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as u32);
Expand All @@ -710,20 +714,17 @@ where
}
pub(crate) fn agg_median(&self, groups: &GroupsProxy) -> Option<Series> {
match groups {
GroupsProxy::Idx(groups) => {
agg_helper_idx::<Float64Type, _>(groups, |(_first, idx)| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take =
unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.median_as_series()
.unpack::<Float64Type>()
.unwrap()
.get(0)
})
}
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<Float64Type, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
return None;
}
let take = unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
take.median_as_series()
.unpack::<Float64Type>()
.unwrap()
.get(0)
}),
GroupsProxy::Slice(groups) => {
agg_helper_slice::<Float64Type, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as u32);
Expand Down Expand Up @@ -805,7 +806,7 @@ where
self.len(),
self.dtype().clone(),
);
for (_first, idx) in groups {
for idx in groups.all().iter() {
let s = unsafe {
self.take_unchecked(idx.iter().map(|i| *i as usize).into())
.into_series()
Expand Down Expand Up @@ -888,7 +889,7 @@ impl AggList for BooleanChunked {
GroupsProxy::Idx(groups) => {
let mut builder =
ListBooleanChunkedBuilder::new(self.name(), groups.len(), self.len());
for (_first, idx) in groups {
for idx in groups.all().iter() {
let ca = unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
builder.append(&ca)
}
Expand All @@ -913,7 +914,7 @@ impl AggList for Utf8Chunked {
GroupsProxy::Idx(groups) => {
let mut builder =
ListUtf8ChunkedBuilder::new(self.name(), groups.len(), self.len());
for (_first, idx) in groups {
for idx in groups.all().iter() {
let ca = unsafe { self.take_unchecked(idx.iter().map(|i| *i as usize).into()) };
builder.append(&ca)
}
Expand Down
7 changes: 6 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ fn execute_projection_cached_window_fns(
for (index, e) in partition.1 {
// caching more than one window expression is a complicated topic for another day
// see issue #2523
state.cache_window = e.as_expression().into_iter().filter(|e| matches!(e, Expr::Window {..})).count() == 1;
state.cache_window = e
.as_expression()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
== 1;

let s = e.evaluate(df, &state)?;
selected_columns.push((index, s));
Expand Down

0 comments on commit a8a1938

Please sign in to comment.