Skip to content

Commit

Permalink
Change hashtable allocation
Browse files Browse the repository at this point in the history
On primitives, its faster to grow and
reallocate the hashtable than to allocate
the needed amount. Cache coherence beats
less work.
  • Loading branch information
ritchie46 committed May 24, 2021
1 parent 900036e commit 1db1457
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 45 deletions.
14 changes: 6 additions & 8 deletions polars/polars-core/src/chunked_array/ops/unique.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ impl<T> ChunkUnique<ObjectType<T>> for ObjectChunked<T> {
}
}

fn fill_set<A>(a: impl Iterator<Item = A>, capacity: usize) -> HashSet<A, RandomState>
fn fill_set<A>(a: impl Iterator<Item = A>) -> HashSet<A, RandomState>
where
A: Hash + Eq,
{
let mut set = HashSet::with_capacity_and_hasher(capacity, RandomState::new());
let mut set = HashSet::with_hasher(RandomState::new());

for val in a {
set.insert(val);
Expand All @@ -110,7 +110,7 @@ fn arg_unique<T>(a: impl Iterator<Item = T>, capacity: usize) -> AlignedVec<u32>
where
T: Hash + Eq,
{
let mut set = HashSet::with_capacity_and_hasher(capacity, RandomState::new());
let mut set = HashSet::with_hasher(RandomState::new());
let mut unique = AlignedVec::with_capacity_aligned(capacity);
a.enumerate().for_each(|(idx, val)| {
if set.insert(val) {
Expand Down Expand Up @@ -152,7 +152,7 @@ where
ChunkedArray<T>: ChunkOps + IntoSeries,
{
fn unique(&self) -> Result<Self> {
let set = fill_set(self.into_iter(), self.len());
let set = fill_set(self.into_iter());
Ok(Self::new_from_opt_iter(self.name(), set.iter().copied()))
}

Expand All @@ -178,7 +178,7 @@ where

impl ChunkUnique<Utf8Type> for Utf8Chunked {
fn unique(&self) -> Result<Self> {
let set = fill_set(self.into_iter(), self.len());
let set = fill_set(self.into_iter());
Ok(Utf8Chunked::new_from_opt_iter(
self.name(),
set.iter().copied(),
Expand Down Expand Up @@ -206,7 +206,7 @@ impl ChunkUnique<Utf8Type> for Utf8Chunked {

impl ChunkUnique<CategoricalType> for CategoricalChunked {
fn unique(&self) -> Result<Self> {
let set = fill_set(self.into_iter(), self.len());
let set = fill_set(self.into_iter());
let mut ca = UInt32Chunked::new_from_opt_iter(self.name(), set.iter().copied());
ca.categorical_map = self.categorical_map.clone();
ca.cast()
Expand Down Expand Up @@ -363,12 +363,10 @@ where
0 => fill_set(
ca.into_no_null_iter()
.map(|v| Some(integer_decode_f64(v.to_f64().unwrap()))),
ca.len(),
),
_ => fill_set(
ca.into_iter()
.map(|opt_v| opt_v.map(|v| integer_decode_f64(v.to_f64().unwrap()))),
ca.len(),
),
};
ChunkedArray::new_from_opt_iter(
Expand Down
83 changes: 56 additions & 27 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ pub mod resample;
pub type GroupTuples = Vec<(u32, Vec<u32>)>;
pub type GroupedMap<T> = HashMap<T, Vec<u32>, RandomState>;

fn groupby<T>(a: impl Iterator<Item = T>) -> GroupTuples
fn groupby<T>(
a: impl Iterator<Item = T>,
b: impl Iterator<Item = T>,
preallocate: bool,
) -> GroupTuples
where
T: Hash + Eq,
{
let hash_tbl = prepare_hashed_relation(a);
let hash_tbl = prepare_hashed_relation(a, b, preallocate);

hash_tbl
.into_iter()
Expand All @@ -38,20 +42,28 @@ where
.collect()
}

fn groupby_threaded_flat<I, T>(iters: Vec<I>, group_size_hint: usize) -> GroupTuples
fn groupby_threaded_flat<I, T>(
iters: Vec<I>,
group_size_hint: usize,
preallocate: bool,
) -> GroupTuples
where
I: IntoIterator<Item = T> + Send,
T: Send + Hash + Eq + Sync + Copy,
{
groupby_threaded(iters, group_size_hint)
groupby_threaded(iters, group_size_hint, preallocate)
.into_iter()
.flatten()
.collect()
}

/// Determine groupby tuples from an iterator. The group_size_hint is used to pre-allocate the group vectors.
/// When the grouping column is a categorical type we already have a good indication of the avg size of the groups.
fn groupby_threaded<I, T>(iters: Vec<I>, group_size_hint: usize) -> Vec<GroupTuples>
fn groupby_threaded<I, T>(
iters: Vec<I>,
group_size_hint: usize,
preallocate: bool,
) -> Vec<GroupTuples>
where
I: IntoIterator<Item = T> + Send,
T: Send + Hash + Eq + Sync + Copy,
Expand All @@ -69,8 +81,11 @@ where
let hashes_and_keys = &hashes_and_keys;
let thread_no = thread_no as u64;

let mut hash_tbl: HashMap<T, (u32, Vec<u32>), RandomState> =
HashMap::with_capacity_and_hasher(size / n_threads, random_state);
let mut hash_tbl: HashMap<T, (u32, Vec<u32>), RandomState> = if preallocate {
HashMap::with_capacity_and_hasher(size / n_threads, random_state)
} else {
HashMap::with_hasher(random_state)
};

let n_threads = n_threads as u64;
let mut offset = 0;
Expand Down Expand Up @@ -177,6 +192,8 @@ fn groupby_multiple_keys(keys: DataFrame) -> GroupTuples {
let (hashes, _) = df_rows_to_hashes(&keys, None);
let size = hashes.len();
// rather over allocate because rehashing is expensive
// its a complicated trade off, because often rehashing is cheaper than
// overallocation because of cache coherence.
let mut hash_tbl: HashMap<IdxHash, (u32, Vec<u32>), IdBuildHasher> =
HashMap::with_capacity_and_hasher(size, IdBuildHasher::default());

Expand Down Expand Up @@ -270,7 +287,7 @@ fn group_multithreaded<T>(ca: &ChunkedArray<T>) -> bool {
}

macro_rules! group_tuples {
($ca: expr, $multithreaded: expr) => {{
($ca: expr, $multithreaded: expr, $preallocate: expr) => {{
// TODO! choose a splitting len
if $multithreaded && group_multithreaded($ca) {
let n_threads = num_cpus::get();
Expand All @@ -281,16 +298,18 @@ macro_rules! group_tuples {
.iter()
.map(|ca| ca.into_no_null_iter())
.collect_vec();
groupby_threaded_flat(iters, 0)
groupby_threaded_flat(iters, 0, $preallocate)
} else {
let iters = splitted.iter().map(|ca| ca.into_iter()).collect_vec();
groupby_threaded_flat(iters, 0)
groupby_threaded_flat(iters, 0, $preallocate)
}
} else {
if $ca.null_count() == 0 {
groupby($ca.into_no_null_iter())
let iter = || $ca.into_no_null_iter();
groupby(iter(), iter(), $preallocate)
} else {
groupby($ca.into_iter())
let iter = || $ca.into_iter();
groupby(iter(), iter(), $preallocate)
}
}
}};
Expand Down Expand Up @@ -319,42 +338,42 @@ where
.map(|ca| ca.downcast_iter().map(|array| array.values()))
.flatten()
.collect_vec();
groupby_threaded_flat(iters, group_size_hint)
groupby_threaded_flat(iters, group_size_hint, false)
} else {
let iters = splitted
.iter()
.map(|ca| ca.downcast_iter())
.flatten()
.collect_vec();
groupby_threaded_flat(iters, group_size_hint)
groupby_threaded_flat(iters, group_size_hint, false)
}
// use the polars-iterators
} else if self.null_count() == 0 {
let iters = splitted
.iter()
.map(|ca| ca.into_no_null_iter())
.collect_vec();
groupby_threaded_flat(iters, group_size_hint)
groupby_threaded_flat(iters, group_size_hint, false)
} else {
let iters = splitted.iter().map(|ca| ca.into_iter()).collect_vec();
groupby_threaded_flat(iters, group_size_hint)
groupby_threaded_flat(iters, group_size_hint, false)
}
} else if self.null_count() == 0 {
groupby(self.into_no_null_iter())
groupby(self.into_no_null_iter(), self.into_no_null_iter(), false)
} else {
groupby(self.into_iter())
groupby(self.into_iter(), self.into_iter(), false)
}
}
}
impl IntoGroupTuples for BooleanChunked {
fn group_tuples(&self, multithreaded: bool) -> GroupTuples {
group_tuples!(self, multithreaded)
group_tuples!(self, multithreaded, false)
}
}

impl IntoGroupTuples for Utf8Chunked {
fn group_tuples(&self, multithreaded: bool) -> GroupTuples {
group_tuples!(self, multithreaded)
group_tuples!(self, multithreaded, true)
}
}

Expand All @@ -377,20 +396,26 @@ macro_rules! impl_into_group_tpls_float {
.iter()
.map(|ca| ca.into_no_null_iter().map(|v| v.to_bits()))
.collect_vec();
groupby_threaded_flat(iters, 0)
groupby_threaded_flat(iters, 0, false)
}
_ => {
let iters = splitted
.iter()
.map(|ca| ca.into_iter().map(|opt_v| opt_v.map(|v| v.to_bits())))
.collect_vec();
groupby_threaded_flat(iters, 0)
groupby_threaded_flat(iters, 0, false)
}
}
} else {
match $self.null_count() {
0 => groupby($self.into_no_null_iter().map(|v| v.to_bits())),
_ => groupby($self.into_iter().map(|opt_v| opt_v.map(|v| v.to_bits()))),
0 => {
let iter = || $self.into_no_null_iter().map(|v| v.to_bits());
groupby(iter(), iter(), false)
}
_ => {
let iter = || $self.into_iter().map(|opt_v| opt_v.map(|v| v.to_bits()));
groupby(iter(), iter(), false)
}
}
}
};
Expand Down Expand Up @@ -1077,7 +1102,7 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
Column: AsRef<str>,
{
// create a mapping from columns to aggregations on that column
let mut map = HashMap::with_capacity_and_hasher(column_to_agg.len(), RandomState::new());
let mut map = HashMap::with_hasher(RandomState::new());
column_to_agg.iter().for_each(|(column, aggregations)| {
map.insert(column.as_ref(), aggregations.as_ref());
});
Expand Down Expand Up @@ -1514,11 +1539,15 @@ mod test {
let ca = UInt8Chunked::new_from_slice("", &slice);
let splitted = split_ca(&ca, 4).unwrap();

let a = groupby(ca.into_iter()).into_iter().sorted().collect_vec();
let b = groupby_threaded_flat(splitted.iter().map(|ca| ca.into_iter()).collect(), 0)
let a = groupby(ca.into_iter(), ca.into_iter(), false)
.into_iter()
.sorted()
.collect_vec();
let b =
groupby_threaded_flat(splitted.iter().map(|ca| ca.into_iter()).collect(), 0, false)
.into_iter()
.sorted()
.collect_vec();

assert_eq!(a, b);
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl DataFrame {
pub fn new<S: IntoSeries>(columns: Vec<S>) -> Result<Self> {
let mut first_len = None;
let mut series_cols = Vec::with_capacity(columns.len());
let mut names = HashSet::with_capacity_and_hasher(columns.len(), RandomState::default());
let mut names = HashSet::with_hasher(RandomState::default());

// check for series length equality and convert into series in one pass
for s in columns {
Expand Down
21 changes: 13 additions & 8 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,17 @@ pub(crate) fn this_thread(h: u64, thread_no: u64, n_threads: u64) -> bool {
}

fn finish_table_from_key_hashes<T>(
hashes_nd_keys: Vec<(u64, T)>,
hashes: Vec<u64>,
keys: impl Iterator<Item = T>,
mut hash_tbl: HashMap<T, Vec<u32>, RandomState>,
offset: usize,
) -> HashMap<T, Vec<u32>, RandomState>
where
T: Hash + Eq,
{
hashes_nd_keys
hashes
.into_iter()
.zip(keys)
.enumerate()
.for_each(|(idx, (h, t))| {
let idx = (idx + offset) as u32;
Expand All @@ -186,25 +188,29 @@ where
}

pub(crate) fn prepare_hashed_relation<T>(
a: impl Iterator<Item = T>,
b: impl Iterator<Item = T>,
preallocate: bool,
) -> HashMap<T, Vec<u32>, RandomState>
where
T: Hash + Eq,
{
let build_hasher = RandomState::default();

let hashes_nd_keys = b
let hashes = a
.map(|val| {
let mut hasher = build_hasher.build_hasher();
val.hash(&mut hasher);
(hasher.finish(), val)
hasher.finish()
})
.collect::<Vec<_>>();

let size = if preallocate { hashes.len() } else { 4096 };

let hash_tbl: HashMap<T, Vec<u32>, RandomState> =
HashMap::with_capacity_and_hasher(hashes_nd_keys.len(), build_hasher);
HashMap::with_capacity_and_hasher(size, build_hasher);

finish_table_from_key_hashes(hashes_nd_keys, hash_tbl, 0)
finish_table_from_key_hashes(hashes, b, hash_tbl, 0)
}

pub(crate) fn prepare_hashed_relation_threaded<T, I>(
Expand All @@ -216,7 +222,6 @@ where
{
let n_threads = iters.len();
let (hashes_and_keys, build_hasher) = create_hash_and_keys_threaded_vectorized(iters, None);
let size = hashes_and_keys.iter().fold(0, |acc, v| acc + v.len());

// We will create a hashtable in every thread.
// We use the hash to partition the keys to the matching hashtable.
Expand All @@ -227,7 +232,7 @@ where
let hashes_and_keys = &hashes_and_keys;
let thread_no = thread_no as u64;
let mut hash_tbl: HashMap<T, Vec<u32>, RandomState> =
HashMap::with_capacity_and_hasher(size / (5 * n_threads), build_hasher);
HashMap::with_hasher(build_hasher);

let n_threads = n_threads as u64;
let mut offset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn process_with_columns(
if let Some(with_columns) = &with_columns {
let cols = columns
.entry(path.to_owned())
.or_insert_with(|| HashSet::with_capacity_and_hasher(256, RandomState::default()));
.or_insert_with(|| HashSet::with_hasher(RandomState::default()));
cols.extend(with_columns.iter().cloned());
}
}
Expand Down

0 comments on commit 1db1457

Please sign in to comment.