Skip to content

Commit

Permalink
perf[rust]: improve pivot ~2.2x and groupby_stable performance. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 4, 2022
1 parent 3620e06 commit f3823ea
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 175 deletions.
7 changes: 7 additions & 0 deletions polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ fn finish_group_order(mut out: Vec<Vec<IdxItem>>, sorted: bool) -> GroupsProxy {
let mut out = if out.len() == 1 {
out.pop().unwrap()
} else {
// pre-sort every array
// this will make the final single threaded sort much faster
POOL.install(|| {
out.par_iter_mut()
.for_each(|g| g.sort_unstable_by_key(|g| g.0))
});

flatten(&out, None)
};
out.sort_unstable_by_key(|g| g.0);
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-core/src/frame/groupby/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use polars_arrow::prelude::*;
use polars_utils::flatten;

use super::*;
use crate::utils::{copy_from_slice_unchecked, split_offsets};
use crate::utils::{_split_offsets, copy_from_slice_unchecked};

/// Used to create the tuples for a groupby operation.
pub trait IntoGroupsProxy {
Expand Down Expand Up @@ -239,7 +239,7 @@ impl IntoGroupsProxy for Utf8Chunked {
if multithreaded {
let n_partitions = set_partition_size();

let split = split_offsets(self.len(), n_partitions);
let split = _split_offsets(self.len(), n_partitions);

let str_hashes = POOL.install(|| {
split
Expand Down Expand Up @@ -415,7 +415,7 @@ pub(super) fn pack_utf8_columns(
n_partitions: usize,
sorted: bool,
) -> GroupsProxy {
let splits = split_offsets(lhs.len(), n_partitions);
let splits = _split_offsets(lhs.len(), n_partitions);
let hb = RandomState::default();
let null_h = get_null_hash_value(hb.clone());

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use self::hashing::*;
use crate::prelude::*;
#[cfg(feature = "groupby_list")]
use crate::utils::Wrap;
use crate::utils::{accumulate_dataframes_vertical, set_partition_size, split_offsets};
use crate::utils::{_split_offsets, accumulate_dataframes_vertical, set_partition_size};
use crate::vector_hasher::{get_null_hash_value, AsU64, StrHash};
use crate::POOL;

Expand Down Expand Up @@ -69,7 +69,7 @@ impl DataFrame {
// pack the bit values together and add a final byte that will be 0
// when there are no null values.
// otherwise we use two bits of this byte to represent null values.
let splits = split_offsets($ca0.len(), n_partitions);
let splits = _split_offsets($ca0.len(), n_partitions);

let keys = POOL.install(|| {
splits
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-core/src/frame/hash_join/sort_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use polars_utils::flatten;

use super::*;
#[cfg(feature = "performant")]
use crate::utils::split_offsets;
use crate::utils::_split_offsets;

pub(super) fn use_sort_merge(s_left: &Series, s_right: &Series) -> bool {
// only use for numeric data for now
Expand All @@ -32,7 +32,7 @@ fn par_sorted_merge_left_impl<T>(
where
T: PolarsNumericType,
{
let offsets = split_offsets(s_left.len(), POOL.current_num_threads());
let offsets = _split_offsets(s_left.len(), POOL.current_num_threads());
let s_left = s_left.rechunk();
let s_right = s_right.rechunk();

Expand Down Expand Up @@ -99,7 +99,7 @@ fn par_sorted_merge_inner_impl<T>(
where
T: PolarsNumericType,
{
let offsets = split_offsets(s_left.len(), POOL.current_num_threads());
let offsets = _split_offsets(s_left.len(), POOL.current_num_threads());
let s_left = s_left.rechunk();
let s_right = s_right.rechunk();

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use series_trait::{IsSorted, *};
use crate::prelude::unique::rank::rank;
#[cfg(feature = "zip_with")]
use crate::series::arithmetic::coerce_lhs_rhs;
use crate::utils::{split_ca, split_offsets, split_series, Wrap};
use crate::utils::{_split_offsets, split_ca, split_series, Wrap};
use crate::POOL;

/// # Series
Expand Down Expand Up @@ -404,7 +404,7 @@ impl Series {
func: &(dyn Fn(usize, usize) -> Result<Series> + Send + Sync),
) -> Result<Series> {
let n_threads = POOL.current_num_threads();
let offsets = split_offsets(len, n_threads);
let offsets = _split_offsets(len, n_threads);

let series: Result<Vec<_>> = POOL.install(|| {
offsets
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ where
}

// prefer this one over split_ca, as this can push the null_count into the thread pool
// returns an `(offset, length)` tuple
#[doc(hidden)]
pub fn split_offsets(len: usize, n: usize) -> Vec<(usize, usize)> {
pub fn _split_offsets(len: usize, n: usize) -> Vec<(usize, usize)> {
if n == 1 {
vec![(0, len)]
} else {
Expand Down Expand Up @@ -809,7 +810,7 @@ where
F: Fn(Series) -> Result<Series> + Send + Sync,
{
let n_threads = n_threads.unwrap_or_else(|| POOL.current_num_threads());
let splits = split_offsets(s.len(), n_threads);
let splits = _split_offsets(s.len(), n_threads);

let chunks = POOL.install(|| {
splits
Expand Down
197 changes: 36 additions & 161 deletions polars/polars-ops/src/pivot.rs → polars/polars-ops/src/pivot/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod positioning;

use polars_core::export::rayon::prelude::*;
use polars_core::frame::groupby::expr::PhysicalAggExpr;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_core::utils::_split_offsets;
use polars_core::{downcast_as_macro_arg_physical, POOL};

const HASHMAP_INIT_SIZE: usize = 512;

Expand Down Expand Up @@ -124,133 +127,6 @@ where
)
}

fn compute_col_idx(
pivot_df: &DataFrame,
column: &str,
groups: &GroupsProxy,
) -> Result<(Vec<IdxSize>, Series)> {
let column_s = pivot_df.column(column)?;
let column_agg = unsafe { column_s.agg_first(groups) };
let column_agg_physical = column_agg.to_physical_repr();

let mut col_to_idx = PlHashMap::with_capacity(HASHMAP_INIT_SIZE);
let mut idx = 0 as IdxSize;
let col_locations = column_agg_physical
.iter()
.map(|v| {
let idx = *col_to_idx.entry(v).or_insert_with(|| {
let old_idx = idx;
idx += 1;
old_idx
});
idx
})
.collect();

drop(col_to_idx);
Ok((col_locations, column_agg))
}

fn compute_row_idx(
pivot_df: &DataFrame,
index: &[String],
groups: &GroupsProxy,
count: usize,
) -> Result<(Vec<IdxSize>, usize, Option<Vec<Series>>)> {
let (row_locations, n_rows, row_index) = if index.len() == 1 {
let index_s = pivot_df.column(&index[0])?;
let index_agg = unsafe { index_s.agg_first(groups) };
let index_agg_physical = index_agg.to_physical_repr();

let mut row_to_idx =
PlIndexMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());
let mut idx = 0 as IdxSize;
let row_locations = index_agg_physical
.iter()
.map(|v| {
let idx = *row_to_idx.entry(v).or_insert_with(|| {
let old_idx = idx;
idx += 1;
old_idx
});
idx
})
.collect::<Vec<_>>();

let row_index = match count {
0 => {
let s = Series::new(
&index[0],
row_to_idx.into_iter().map(|(k, _)| k).collect::<Vec<_>>(),
);
let s = restore_logical_type(&s, index_s.dtype());
Some(vec![s])
}
_ => None,
};

(row_locations, idx as usize, row_index)
} else {
let index_s = pivot_df.columns(index)?;
let index_agg_physical = index_s
.iter()
.map(|s| unsafe { s.agg_first(groups).to_physical_repr().into_owned() })
.collect::<Vec<_>>();
let mut iters = index_agg_physical
.iter()
.map(|s| s.iter())
.collect::<Vec<_>>();
let mut row_to_idx =
PlIndexMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());
let mut idx = 0 as IdxSize;

let mut row_locations = Vec::with_capacity(groups.len());
loop {
match iters
.iter_mut()
.map(|it| it.next())
.collect::<Option<Vec<_>>>()
{
None => break,
Some(items) => {
let idx = *row_to_idx.entry(items).or_insert_with(|| {
let old_idx = idx;
idx += 1;
old_idx
});
row_locations.push(idx)
}
}
}
let row_index = match count {
0 => Some(
index
.iter()
.enumerate()
.map(|(i, name)| {
let s = Series::new(
name,
row_to_idx
.iter()
.map(|(k, _)| {
debug_assert!(i < k.len());
unsafe { k.get_unchecked(i).clone() }
})
.collect::<Vec<_>>(),
);
restore_logical_type(&s, index_s[i].dtype())
})
.collect::<Vec<_>>(),
),
_ => None,
};

(row_locations, idx as usize, row_index)
};

Ok((row_locations, n_rows, row_index))
}

fn pivot_impl(
pivot_df: &DataFrame,
// these columns will be aggregated in the nested groupby
Expand Down Expand Up @@ -287,8 +163,8 @@ fn pivot_impl(
};

let (col, row) = POOL.join(
|| compute_col_idx(pivot_df, column, &groups),
|| compute_row_idx(pivot_df, index, &groups, count),
|| positioning::compute_col_idx(pivot_df, column, &groups),
|| positioning::compute_row_idx(pivot_df, index, &groups, count),
);
let (col_locations, column_agg) = col?;
let (row_locations, n_rows, mut row_index) = row?;
Expand Down Expand Up @@ -322,39 +198,38 @@ fn pivot_impl(
let headers = column_agg.unique_stable()?.cast(&DataType::Utf8)?;
let headers = headers.utf8().unwrap();
let n_cols = headers.len();

let mut buf = vec![AnyValue::Null; n_rows * n_cols];

let value_agg_phys = value_agg.to_physical_repr();

for ((row_idx, col_idx), val) in row_locations
.iter()
.zip(&col_locations)
.zip(value_agg_phys.iter())
{
// Safety:
// in bounds
unsafe {
let idx = *row_idx as usize + *col_idx as usize * n_rows;
debug_assert!(idx < buf.len());
*buf.get_unchecked_mut(idx) = val;
let logical_type = value_agg.dtype();

debug_assert_eq!(row_locations.len(), col_locations.len());
debug_assert_eq!(value_agg_phys.len(), row_locations.len());

let mut cols = if value_agg_phys.dtype().is_numeric() {
macro_rules! dispatch {
($ca:expr) => {{
positioning::position_aggregates_numeric(
n_rows,
n_cols,
&row_locations,
&col_locations,
$ca,
logical_type,
headers,
)
}};
}
}

let headers_iter = headers.par_iter_indexed();

let mut cols = (0..n_cols)
.into_par_iter()
.zip(headers_iter)
.map(|(i, opt_name)| {
let offset = i * n_rows;
let avs = &buf[offset..offset + n_rows];
let name = opt_name.unwrap_or("null");
let mut out = Series::new(name, avs);
finish_logical_type(&mut out, value_agg.dtype());
out
})
.collect::<Vec<_>>();
downcast_as_macro_arg_physical!(value_agg_phys, dispatch)
} else {
positioning::position_aggregates(
n_rows,
n_cols,
&row_locations,
&col_locations,
&value_agg_phys,
logical_type,
headers,
)
};

if sort_columns {
cols.sort_unstable_by(|a, b| a.name().partial_cmp(b.name()).unwrap());
Expand Down

0 comments on commit f3823ea

Please sign in to comment.