Skip to content

Commit

Permalink
refactor(rust): use IdHash for streaming groupby generic (#5435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 7, 2022
1 parent 3450bc2 commit 41fb566
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::any::Any;
use std::hash::{Hash, Hasher};

use hashbrown::hash_map::RawEntryMut;
use num::NumCast;
Expand All @@ -21,7 +22,25 @@ use crate::expressions::PhysicalPipedExpr;
use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};

// This is the hash and the Index offset in the linear buffer
type Key = (u64, IdxSize);
#[derive(Copy, Clone)]
struct Key {
hash: u64,
idx: IdxSize,
}

impl Key {
#[inline]
fn new(hash: u64, idx: IdxSize) -> Self {
Self { hash, idx }
}
}

impl Hash for Key {
#[inline]
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash)
}
}

// we store a hashmap per partition (partitioned by hash)
// the hashmap contains indexes as keys and as values
Expand All @@ -32,7 +51,7 @@ pub struct GenericGroupbySink {
thread_no: usize,
// idx is the offset in the array with keys
// idx is the offset in the array with aggregators
pre_agg_partitions: Vec<PlHashMap<Key, IdxSize>>,
pre_agg_partitions: Vec<PlIdHashMap<Key, IdxSize>>,
// the aggregations/keys are all tightly packed
// the aggregation function of a group can be found
// by:
Expand Down Expand Up @@ -69,9 +88,7 @@ impl GenericGroupbySink {
let hb = RandomState::default();
let partitions = _set_partition_size();

let pre_agg = load_vec(partitions, || {
PlHashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, hb.clone())
});
let pre_agg = load_vec(partitions, || PlIdHashMap::with_capacity(HASHMAP_INIT_SIZE));
let keys = load_vec(partitions, || {
Vec::with_capacity(HASHMAP_INIT_SIZE * key_columns.len())
});
Expand Down Expand Up @@ -141,7 +158,7 @@ impl GenericGroupbySink {

agg_map.into_iter().skip(offset).take(slice_len).for_each(
|(k, &offset)| {
let keys_offset = k.1 as usize;
let keys_offset = k.idx as usize;
let keys = unsafe {
current_keys
.get_unchecked_release(keys_offset..keys_offset + n_keys)
Expand Down Expand Up @@ -237,7 +254,7 @@ impl Sink for GenericGroupbySink {
let current_key_values = unsafe { self.keys.get_unchecked_release_mut(partition) };

let entry = current_partition.raw_entry_mut().from_hash(h, |key| {
let idx = key.1 as usize;
let idx = key.idx as usize;
if self.keys_series.len() > 1 {
current_keys_buf.iter().enumerate().all(|(i, key)| unsafe {
current_key_values.get_unchecked_release(i + idx) == key
Expand All @@ -256,7 +273,7 @@ impl Sink for GenericGroupbySink {
NumCast::from(current_aggregators.len()).unwrap_unchecked_release()
};
let keys_offset = unsafe {
(
Key::new(
h,
NumCast::from(current_key_values.len()).unwrap_unchecked_release(),
)
Expand Down Expand Up @@ -309,7 +326,7 @@ impl Sink for GenericGroupbySink {
|(((map_self, map_other), aggregators_self), aggregators_other)| {
for (k_other, &agg_idx_other) in map_other.iter() {
// the hash value
let h = k_other.0;
let h = k_other.hash;
// the partition where all keys and maps are located
let partition = hash_to_partition(h, n_partitions);
// get the key buffers
Expand All @@ -319,15 +336,15 @@ impl Sink for GenericGroupbySink {
unsafe { other.keys.get_unchecked_release(partition) };

// the offset in the keys of other
let idx_other = k_other.1 as usize;
let idx_other = k_other.idx as usize;
// slice to the keys of other
let keys_other = unsafe {
keys_buffer_other.get_unchecked_release(idx_other..idx_other + n_keys)
};

let entry = map_self.raw_entry_mut().from_hash(h, |k_self| {
// the offset in the keys of self
let idx_self = k_self.1 as usize;
let idx_self = k_self.idx as usize;
// slice to the keys of self
// safety:
// in bounds
Expand All @@ -347,7 +364,7 @@ impl Sink for GenericGroupbySink {
};
// get the key, comprised of the hash and the current offset in the keys buffer
let key = unsafe {
(
Key::new(
h,
NumCast::from(keys_buffer_self.len())
.unwrap_unchecked_release(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use polars_core::prelude::PlHashMap;
use hashbrown::HashMap;
use polars_core::utils::slice_offsets;

pub(super) fn compute_slices<K, V>(
pre_agg_partitions: &[PlHashMap<K, V>],
pub(super) fn compute_slices<K, V, HB>(
pre_agg_partitions: &[HashMap<K, V, HB>],
slice: Option<(i64, usize)>,
) -> Vec<Option<(usize, usize)>> {
if let Some((offset, slice_len)) = slice {
Expand Down

0 comments on commit 41fb566

Please sign in to comment.