Skip to content

Commit

Permalink
perf[rust]: don't allocate indices on rolling_groupby + by (#4917)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 21, 2022
1 parent 9012df6 commit 6cdc83e
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 47 deletions.
16 changes: 8 additions & 8 deletions polars/polars-core/src/frame/groupby/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ impl IntoGroupsProxy for Utf8Chunked {
// Safety:
// the underlying data is tied to self
unsafe {
std::mem::transmute::<StrHash<'_>, StrHash<'a>>(StrHash::new(
opt_s, hash,
))
std::mem::transmute::<BytesHash<'_>, BytesHash<'a>>(
BytesHash::new_from_str(opt_s, hash),
)
}
})
.collect::<Vec<_>>()
Expand All @@ -273,7 +273,7 @@ impl IntoGroupsProxy for Utf8Chunked {
Some(s) => str::get_hash(s, &hb),
None => null_h,
};
StrHash::new(opt_s, hash)
BytesHash::new_from_str(opt_s, hash)
})
.collect::<Vec<_>>();
groupby(str_hashes.iter(), sorted)
Expand Down Expand Up @@ -456,7 +456,7 @@ pub(super) fn pack_utf8_columns(
))
};
let hash = str::get_hash(str_val, &hb);
str_hashes.push(StrHash::new(Some(str_val), hash))
str_hashes.push(BytesHash::new(Some(str_val.as_bytes()), hash))
}
(None, Some(rhs)) => {
let start = values.len();
Expand All @@ -471,7 +471,7 @@ pub(super) fn pack_utf8_columns(
))
};
let hash = str::get_hash(str_val, &hb);
str_hashes.push(StrHash::new(Some(str_val), hash))
str_hashes.push(BytesHash::new(Some(str_val.as_bytes()), hash))
}
(Some(lhs), None) => {
let start = values.len();
Expand All @@ -486,9 +486,9 @@ pub(super) fn pack_utf8_columns(
))
};
let hash = str::get_hash(str_val, &hb);
str_hashes.push(StrHash::new(Some(str_val), hash))
str_hashes.push(BytesHash::new(Some(str_val.as_bytes()), hash))
}
(None, None) => str_hashes.push(StrHash::new(None, null_h)),
(None, None) => str_hashes.push(BytesHash::new(None, null_h)),
}
});
(str_hashes, values)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::prelude::*;
#[cfg(feature = "groupby_list")]
use crate::utils::Wrap;
use crate::utils::{_split_offsets, accumulate_dataframes_vertical, set_partition_size};
use crate::vector_hasher::{get_null_hash_value, AsU64, StrHash};
use crate::vector_hasher::{get_null_hash_value, AsU64, BytesHash};
use crate::POOL;

pub mod aggregations;
Expand Down
4 changes: 1 addition & 3 deletions polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ impl GroupsProxy {
match self {
GroupsProxy::Idx(groups) => groups,
GroupsProxy::Slice { groups, .. } => {
if std::env::var("POLARS_VERBOSE").is_ok() {
println!("had to reallocate groups, missed an optimization opportunity.")
}
eprintln!("Had to reallocate groups, missed an optimization opportunity. Please open an issue.");
groups
.iter()
.map(|&[first, len]| (first, (first..first + len).collect_trusted::<Vec<_>>()))
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::utils::series::to_physical_and_bit_repr;
use crate::utils::{set_partition_size, slice_slice, split_ca};
use crate::vector_hasher::{
create_hash_and_keys_threaded_vectorized, prepare_hashed_relation_threaded, this_partition,
AsU64, StrHash,
AsU64, BytesHash,
};
use crate::POOL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ where
pub(crate) fn prepare_strs<'a>(
been_split: &'a [Utf8Chunked],
hb: &RandomState,
) -> Vec<Vec<StrHash<'a>>> {
) -> Vec<Vec<BytesHash<'a>>> {
POOL.install(|| {
been_split
.par_iter()
Expand All @@ -308,7 +308,7 @@ pub(crate) fn prepare_strs<'a>(
let mut state = hb.build_hasher();
opt_s.hash(&mut state);
let hash = state.finish();
StrHash::new(opt_s, hash)
BytesHash::new_from_str(opt_s, hash)
})
.collect::<Vec<_>>()
})
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
extern crate core;

#[macro_use]
pub mod utils;
pub mod chunked_array;
Expand Down
26 changes: 17 additions & 9 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,30 +408,38 @@ impl IdxHash {
/// During rehashes, we will rehash the hash instead of the string, that makes rehashing
/// cheap and allows cache coherent small hash tables.
#[derive(Eq, Copy, Clone, Debug)]
pub(crate) struct StrHash<'a> {
str: Option<&'a str>,
pub(crate) struct BytesHash<'a> {
payload: Option<&'a [u8]>,
hash: u64,
}

impl<'a> Hash for StrHash<'a> {
impl<'a> Hash for BytesHash<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash)
}
}

impl<'a> StrHash<'a> {
pub(crate) fn new(s: Option<&'a str>, hash: u64) -> Self {
Self { str: s, hash }
impl<'a> BytesHash<'a> {
#[inline]
pub(crate) fn new(s: Option<&'a [u8]>, hash: u64) -> Self {
Self { payload: s, hash }
}
#[inline]
pub(crate) fn new_from_str(s: Option<&'a str>, hash: u64) -> Self {
Self {
payload: s.map(|s| s.as_bytes()),
hash,
}
}
}

impl<'a> PartialEq for StrHash<'a> {
impl<'a> PartialEq for BytesHash<'a> {
fn eq(&self, other: &Self) -> bool {
(self.hash == other.hash) && (self.str == other.str)
(self.hash == other.hash) && (self.payload == other.payload)
}
}

impl<'a> AsU64 for StrHash<'a> {
impl<'a> AsU64 for BytesHash<'a> {
fn as_u64(self) -> u64 {
self.hash
}
Expand Down
81 changes: 58 additions & 23 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl Wrap<&DataFrame> {
.into_series();

update_bounds(lower, upper);
update_subgroups(&sub_groups, base_g)
update_subgroups_idx(&sub_groups, base_g)
})
.collect();
GroupsProxy::Idx(groupsidx)
Expand All @@ -280,7 +280,7 @@ impl Wrap<&DataFrame> {
options.closed_window,
tu,
);
update_subgroups(&sub_groups, base_g)
update_subgroups_idx(&sub_groups, base_g)
})
.collect()
});
Expand Down Expand Up @@ -361,36 +361,71 @@ impl Wrap<&DataFrame> {
dt = unsafe { dt.agg_list(&groups).explode().unwrap() };

// continue determining the rolling indexes.
let groups = groups.into_idx();

let groupsidx = POOL.install(|| {
groups
.par_iter()
.flat_map(|base_g| {
let dt = unsafe { dt_local.take_unchecked(base_g.1.into()) };
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let sub_groups = groupby_values(
options.period,
options.offset,
ts,
options.closed_window,
tu,
);
POOL.install(|| match groups {
GroupsProxy::Idx(groups) => {
let idx = groups
.par_iter()
.flat_map(|base_g| {
let dt = unsafe { dt_local.take_unchecked(base_g.1.into()) };
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let sub_groups = groupby_values(
options.period,
options.offset,
ts,
options.closed_window,
tu,
);
update_subgroups_idx(&sub_groups, base_g)
})
.collect();

update_subgroups(&sub_groups, base_g)
})
.collect()
});
GroupsProxy::Idx(groupsidx)
GroupsProxy::Idx(idx)
}
GroupsProxy::Slice { groups, .. } => {
let slice_groups = groups
.par_iter()
.flat_map(|base_g| {
let dt = dt_local.slice(base_g[0] as i64, base_g[1] as usize);
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let sub_groups = groupby_values(
options.period,
options.offset,
ts,
options.closed_window,
tu,
);
update_subgroups_slice(&sub_groups, *base_g)
})
.collect();

GroupsProxy::Slice {
groups: slice_groups,
rolling: false,
}
}
})
};

let dt = dt.cast(time_type).unwrap();

Ok((dt, by, groups))
}
}

fn update_subgroups(
fn update_subgroups_slice(sub_groups: &[[IdxSize; 2]], base_g: [IdxSize; 2]) -> Vec<[IdxSize; 2]> {
sub_groups
.iter()
.map(|&[first, len]| {
let new_first = base_g[0] + first;
[new_first, len]
})
.collect_trusted::<Vec<_>>()
}

fn update_subgroups_idx(
sub_groups: &[[IdxSize; 2]],
base_g: (IdxSize, &Vec<IdxSize>),
) -> Vec<(IdxSize, Vec<IdxSize>)> {
Expand Down

0 comments on commit 6cdc83e

Please sign in to comment.