Skip to content

Commit

Permalink
improve hashing performance w/ specialized hashers
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 21, 2021
1 parent 50adcbd commit 2124095
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
cd tests/db-benchmark
Rscript -e 'install.packages("data.table", repos="https://Rdatatable.github.io/data.table")'
Rscript groupby-datagen.R 1e7 1e2 5 0
echo "ON STRINGS"
python main.py on_strings
echo "ON CATEGORICALS"
python main.py
15 changes: 10 additions & 5 deletions polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::vector_hasher::{df_rows_to_hashes_threaded, IdBuildHasher, IdxHash};
use crate::vector_hasher::{this_partition, AsU64};
use crate::POOL;
use crate::{datatypes::PlHashMap, utils::split_df};
use ahash::CallHasher;
use hashbrown::hash_map::Entry;
use hashbrown::{hash_map::RawEntryMut, HashMap};
use rayon::prelude::*;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub(crate) fn groupby_threaded_num<T, IntoSlice>(
n_partitions: u64,
) -> GroupTuples
where
T: Send + Hash + Eq + Sync + Copy + AsU64,
T: Send + Hash + Eq + Sync + Copy + AsU64 + CallHasher,
IntoSlice: AsRef<[T]> + Send + Sync,
{
assert!(n_partitions.is_power_of_two());
Expand All @@ -75,22 +76,26 @@ where
for keys in &keys {
let keys = keys.as_ref();
let len = keys.len() as u32;
let hasher = hash_tbl.hasher().clone();

let mut cnt = 0;
keys.iter().for_each(|k| {
let idx = cnt + offset;
cnt += 1;

if this_partition(k.as_u64(), thread_no, n_partitions) {
let entry = hash_tbl.entry(*k);
let hash = T::get_hash(k, &hasher);
let entry = hash_tbl.raw_entry_mut().from_key_hashed_nocheck(hash, k);

match entry {
Entry::Vacant(entry) => {
RawEntryMut::Vacant(entry) => {
let mut tuples = Vec::with_capacity(group_size_hint);
tuples.push(idx);
entry.insert((idx, tuples));
entry.insert_with_hasher(hash, *k, (idx, tuples), |k| {
T::get_hash(k, &hasher)
});
}
Entry::Occupied(mut entry) => {
RawEntryMut::Occupied(mut entry) => {
let v = entry.get_mut();
v.1.push(idx);
}
Expand Down
22 changes: 13 additions & 9 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use crate::utils::Wrap;
use crate::utils::{
accumulate_dataframes_vertical, copy_from_slice_unchecked, set_partition_size, split_ca, NoNull,
};
use crate::vector_hasher::{AsU64, StrHash};
use crate::vector_hasher::{get_null_hash_value, AsU64, StrHash};
use crate::POOL;
use ahash::RandomState;
use ahash::{CallHasher, RandomState};
use hashbrown::HashMap;
use num::NumCast;
use rayon::prelude::*;
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash, Hasher};
use std::hash::Hash;

pub mod aggregations;
pub(crate) mod hashing;
Expand Down Expand Up @@ -132,6 +132,8 @@ impl IntoGroupTuples for BooleanChunked {
impl IntoGroupTuples for Utf8Chunked {
fn group_tuples(&self, multithreaded: bool) -> GroupTuples {
let hb = RandomState::default();
let null_h = get_null_hash_value(hb.clone());

if multithreaded {
let n_partitions = set_partition_size();

Expand All @@ -143,9 +145,10 @@ impl IntoGroupTuples for Utf8Chunked {
.map(|ca| {
ca.into_iter()
.map(|opt_s| {
let mut state = hb.build_hasher();
opt_s.hash(&mut state);
let hash = state.finish();
let hash = match opt_s {
Some(s) => str::get_hash(s, &hb),
None => null_h,
};
StrHash::new(opt_s, hash)
})
.collect::<Vec<_>>()
Expand All @@ -157,9 +160,10 @@ impl IntoGroupTuples for Utf8Chunked {
let str_hashes = self
.into_iter()
.map(|opt_s| {
let mut state = hb.build_hasher();
opt_s.hash(&mut state);
let hash = state.finish();
let hash = match opt_s {
Some(s) => str::get_hash(s, &hb),
None => null_h,
};
StrHash::new(opt_s, hash)
})
.collect::<Vec<_>>();
Expand Down
54 changes: 26 additions & 28 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::datatypes::UInt64Chunked;
use crate::prelude::*;
use crate::utils::arrow::array::Array;
use crate::POOL;
use ahash::RandomState;
use ahash::{CallHasher, RandomState};
use arrow::bitmap::utils::get_bit_unchecked;
use hashbrown::{hash_map::RawEntryMut, HashMap};
use polars_arrow::utils::CustomIterTools;
Expand All @@ -27,7 +27,7 @@ pub trait VecHash {
}
}

fn get_null_hash_value(random_state: RandomState) -> u64 {
pub(crate) fn get_null_hash_value(random_state: RandomState) -> u64 {
// we just start with a large prime number and hash that twice
// to get a constant hash value for null/None
let mut hasher = random_state.build_hasher();
Expand All @@ -41,7 +41,7 @@ fn get_null_hash_value(random_state: RandomState) -> u64 {
impl<T> VecHash for ChunkedArray<T>
where
T: PolarsIntegerType,
T::Native: Hash,
T::Native: Hash + CallHasher,
{
fn vec_hash(&self, random_state: RandomState) -> AlignedVec<u64> {
// Note that we don't use the no null branch! This can break in unexpected ways.
Expand All @@ -53,11 +53,12 @@ where
let mut av = AlignedVec::with_capacity(self.len());

self.downcast_iter().for_each(|arr| {
av.extend(arr.values().as_slice().iter().map(|v| {
let mut hasher = random_state.build_hasher();
v.hash(&mut hasher);
hasher.finish()
}));
av.extend(
arr.values()
.as_slice()
.iter()
.map(|v| T::Native::get_hash(v, &random_state)),
);
});

let null_h = get_null_hash_value(random_state);
Expand Down Expand Up @@ -94,18 +95,16 @@ where
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let mut hasher = random_state.build_hasher();
v.hash(&mut hasher);
*h = boost_hash_combine(hasher.finish(), *h)
let l = T::Native::get_hash(v, &random_state);
*h = boost_hash_combine(l, *h)
}),
_ => arr
.iter()
.zip(&mut hashes[offset..])
.for_each(|(opt_v, h)| match opt_v {
Some(v) => {
let mut hasher = random_state.build_hasher();
v.hash(&mut hasher);
*h = boost_hash_combine(hasher.finish(), *h)
let l = T::Native::get_hash(v, &random_state);
*h = boost_hash_combine(l, *h)
}
None => {
*h = boost_hash_combine(null_h, *h);
Expand All @@ -119,23 +118,26 @@ where

impl VecHash for Utf8Chunked {
fn vec_hash(&self, random_state: RandomState) -> AlignedVec<u64> {
let null_h = get_null_hash_value(random_state.clone());
let mut av = AlignedVec::with_capacity(self.len());
self.downcast_iter().for_each(|arr| {
av.extend(arr.into_iter().map(|opt_v| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
av.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => str::get_hash(v, &random_state),
None => null_h,
}))
});
av
}

fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state.clone());
self.apply_to_slice(
|opt_v, h| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
boost_hash_combine(hasher.finish(), *h)
let l = match opt_v {
Some(v) => str::get_hash(v, &random_state),
None => null_h,
};
boost_hash_combine(l, *h)
},
hashes,
)
Expand Down Expand Up @@ -283,17 +285,13 @@ impl AsU64 for [u8; 9] {
const BUILD_HASHER: RandomState = RandomState::with_seeds(0, 0, 0, 0);
impl AsU64 for [u8; 17] {
fn as_u64(self) -> u64 {
let mut h = BUILD_HASHER.build_hasher();
self.hash(&mut h);
h.finish()
<[u8]>::get_hash(&self, &BUILD_HASHER)
}
}

impl AsU64 for [u8; 13] {
fn as_u64(self) -> u64 {
let mut h = BUILD_HASHER.build_hasher();
self.hash(&mut h);
h.finish()
<[u8]>::get_hash(&self, &BUILD_HASHER)
}
}

Expand Down Expand Up @@ -395,7 +393,7 @@ impl<'a> AsU64 for StrHash<'a> {
/// For partitions that are a power of 2 we can use a bitshift instead of a modulo.
pub(crate) fn this_partition(h: u64, thread_no: u64, n_partitions: u64) -> bool {
// n % 2^i = n & (2^i - 1)
(h + thread_no) & (n_partitions - 1) == 0
(h.wrapping_add(thread_no)) & n_partitions.wrapping_sub(1) == 0
}

pub(crate) fn prepare_hashed_relation_threaded<T, I>(
Expand Down

0 comments on commit 2124095

Please sign in to comment.