Skip to content

Commit

Permalink
use hashbrown for all maps and sets
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 6, 2021
1 parent f3f2ad1 commit ce960b5
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 126 deletions.
16 changes: 9 additions & 7 deletions polars/polars-core/src/chunked_array/builder/categorical.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::prelude::*;
use crate::use_string_cache;
use crate::utils::arrow::array::{Array, ArrayBuilder};
use ahash::AHashMap;
use crate::{
datatypes::PlHashMap,
use_string_cache,
utils::arrow::array::{Array, ArrayBuilder},
};
use arrow::array::{LargeStringArray, LargeStringBuilder};
use polars_arrow::builder::PrimitiveArrayBuilder;
use std::marker::PhantomData;

pub enum RevMappingBuilder {
Global(AHashMap<u32, u32>, LargeStringBuilder, u128),
Global(PlHashMap<u32, u32>, LargeStringBuilder, u128),
Local(LargeStringBuilder),
}

Expand Down Expand Up @@ -39,7 +41,7 @@ impl RevMappingBuilder {
}

pub enum RevMapping {
Global(AHashMap<u32, u32>, LargeStringArray, u128),
Global(PlHashMap<u32, u32>, LargeStringArray, u128),
Local(LargeStringArray),
}

Expand Down Expand Up @@ -81,7 +83,7 @@ impl CategoricalChunkedBuilder {
let builder = LargeStringBuilder::new(capacity / 10);
let reverse_mapping = if use_string_cache() {
let uuid = crate::STRING_CACHE.lock_map().uuid;
RevMappingBuilder::Global(AHashMap::default(), builder, uuid)
RevMappingBuilder::Global(PlHashMap::default(), builder, uuid)
} else {
RevMappingBuilder::Local(builder)
};
Expand Down Expand Up @@ -123,7 +125,7 @@ impl CategoricalChunkedBuilder {
}
}
} else {
let mut mapping = AHashMap::new();
let mut mapping = PlHashMap::new();
for opt_s in i {
match opt_s {
Some(s) => {
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! are currently supported.
//!
use crate::prelude::*;
use ahash::RandomState;
pub use arrow::datatypes::DataType as ArrowDataType;
pub use arrow::datatypes::{
ArrowNumericType, ArrowPrimitiveType, BooleanType, Date32Type, Date64Type,
Expand Down Expand Up @@ -674,3 +675,8 @@ impl From<ArrowSchema> for Schema {
(&a_schema).into()
}
}

#[cfg(feature = "private")]
pub type PlHashMap<K, V> = hashbrown::HashMap<K, V, RandomState>;
#[cfg(feature = "private")]
pub type PlHashSet<V> = hashbrown::HashSet<V, RandomState>;
13 changes: 7 additions & 6 deletions polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::GroupTuples;
use crate::prelude::*;
use crate::utils::{is_power_of_2, split_df};
use crate::vector_hasher::{df_rows_to_hashes, df_rows_to_hashes_threaded, IdBuildHasher, IdxHash};
use crate::vector_hasher::{this_partition, AsU64};
use crate::POOL;
use ahash::RandomState;
use crate::{
datatypes::PlHashMap,
utils::{is_power_of_2, split_df},
};
use hashbrown::hash_map::Entry;
use hashbrown::{hash_map::RawEntryMut, HashMap};
use rayon::prelude::*;
Expand All @@ -18,8 +20,7 @@ pub(crate) fn groupby<T>(a: impl Iterator<Item = T>) -> GroupTuples
where
T: Hash + Eq,
{
let mut hash_tbl: HashMap<T, (u32, Vec<u32>), RandomState> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());
let mut hash_tbl: PlHashMap<T, (u32, Vec<u32>)> = PlHashMap::with_capacity(HASHMAP_INIT_SIZE);
let mut cnt = 0;
a.for_each(|k| {
let idx = cnt;
Expand Down Expand Up @@ -63,8 +64,8 @@ where
(0..n_partitions).into_par_iter().map(|thread_no| {
let thread_no = thread_no as u64;

let mut hash_tbl: HashMap<T, (u32, Vec<u32>), RandomState> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());
let mut hash_tbl: PlHashMap<T, (u32, Vec<u32>)> =
PlHashMap::with_capacity(HASHMAP_INIT_SIZE);

let mut offset = 0;
for keys in &keys {
Expand Down
13 changes: 5 additions & 8 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::vector_hasher::{
create_hash_and_keys_threaded_vectorized, prepare_hashed_relation_threaded, this_thread, AsU64,
StrHash,
};
use crate::POOL;
use crate::{datatypes::PlHashMap, POOL};
use ahash::RandomState;
use hashbrown::hash_map::{Entry, RawEntryMut};
use hashbrown::HashMap;
Expand Down Expand Up @@ -95,7 +95,7 @@ unsafe fn get_hash_tbl_threaded_join_mut<T, H>(
/// Probe the build table and add tuples to the results (inner join)
fn probe_inner<T, F>(
probe: &[T],
hash_tbls: &[HashMap<T, Vec<u32>, RandomState>],
hash_tbls: &[PlHashMap<T, Vec<u32>>],
results: &mut Vec<(u32, u32)>,
local_offset: usize,
n_tables: u64,
Expand All @@ -119,9 +119,7 @@ fn probe_inner<T, F>(
});
}

pub(crate) fn create_probe_table<T, IntoSlice>(
keys: Vec<IntoSlice>,
) -> Vec<HashMap<T, Vec<u32>, RandomState>>
pub(crate) fn create_probe_table<T, IntoSlice>(keys: Vec<IntoSlice>) -> Vec<PlHashMap<T, Vec<u32>>>
where
T: Send + Hash + Eq + Sync + Copy + AsU64,
IntoSlice: AsRef<[T]> + Send + Sync,
Expand All @@ -135,8 +133,7 @@ where
(0..n_threads).into_par_iter().map(|thread_no| {
let thread_no = thread_no as u64;

let mut hash_tbl: HashMap<T, Vec<u32>, RandomState> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());
let mut hash_tbl: PlHashMap<T, Vec<u32>> = PlHashMap::with_capacity(HASHMAP_INIT_SIZE);

let n_threads = (n_threads as u64).into();
let mut offset = 0;
Expand Down Expand Up @@ -304,7 +301,7 @@ where
/// Probe the build table and add tuples to the results (inner join)
fn probe_outer<T, F, G, H>(
probe_hashes: &[Vec<(u64, T)>],
hash_tbls: &mut [HashMap<T, Vec<u32>, RandomState>],
hash_tbls: &mut [PlHashMap<T, Vec<u32>>],
results: &mut Vec<(Option<u32>, Option<u32>)>,
n_tables: u64,
// Function that get index_a, index_b when there is a match and pushes to result
Expand Down
56 changes: 8 additions & 48 deletions polars/polars-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<T> DerefMut for NoNull<T> {
}
}

pub fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
pub(crate) fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
match iter.size_hint() {
(_lower, Some(upper)) => upper,
(0, None) => 1024,
Expand Down Expand Up @@ -152,14 +152,17 @@ macro_rules! split_array {
}};
}

#[cfg(feature = "private")]
pub fn split_ca<T>(ca: &ChunkedArray<T>, n: usize) -> Result<Vec<ChunkedArray<T>>> {
split_array!(ca, n, i64)
}

#[cfg(feature = "private")]
pub fn split_series(s: &Series, n: usize) -> Result<Vec<Series>> {
split_array!(s, n, i64)
}

#[cfg(feature = "private")]
pub fn split_df(df: &DataFrame, n: usize) -> Result<Vec<DataFrame>> {
trait Len {
fn len(&self) -> usize;
Expand All @@ -173,6 +176,7 @@ pub fn split_df(df: &DataFrame, n: usize) -> Result<Vec<DataFrame>> {
}

#[inline]
#[cfg(feature = "private")]
pub fn slice_offsets(offset: i64, length: usize, array_len: usize) -> (usize, usize) {
let abs_offset = offset.abs() as usize;

Expand Down Expand Up @@ -204,6 +208,7 @@ impl Default for Node {
}

#[derive(Clone)]
#[cfg(feature = "private")]
pub struct Arena<T> {
items: Vec<T>,
}
Expand Down Expand Up @@ -292,47 +297,6 @@ impl<T: Default> Arena<T> {
}
}

/// An iterator that iterates an unknown at compile time number
/// of iterators simultaneously.
///
/// IMPORTANT: It differs from `std::iter::Zip` in the return type
/// of `next`. It returns a `Vec` instead of a `tuple`, which implies
/// that the result is non-copiable anymore.
pub struct DynamicZip<I>
where
I: Iterator,
{
iterators: Vec<I>,
}

impl<I, T> Iterator for DynamicZip<I>
where
I: Iterator<Item = T>,
{
type Item = Vec<T>;

fn next(&mut self) -> Option<Self::Item> {
self.iterators.iter_mut().map(|iter| iter.next()).collect()
}
}

/// A trait to convert a value to a `DynamicZip`.
pub trait IntoDynamicZip<I>
where
I: Iterator,
{
fn into_dynamic_zip(self) -> DynamicZip<I>;
}

impl<I> IntoDynamicZip<I> for Vec<I>
where
I: Iterator,
{
fn into_dynamic_zip(self) -> DynamicZip<I> {
DynamicZip { iterators: self }
}
}

#[macro_export]
macro_rules! match_arrow_data_type_apply_macro {
($obj:expr, $macro:ident, $macro_utf8:ident, $macro_bool:ident $(, $opt_args:expr)*) => {{
Expand Down Expand Up @@ -533,6 +497,7 @@ macro_rules! df {
}

/// Given two datatypes, determine the supertype that both types can safely be cast to
#[cfg(feature = "private")]
pub fn get_supertype(l: &DataType, r: &DataType) -> Result<DataType> {
match _get_supertype(l, r) {
Some(dt) => Ok(dt),
Expand Down Expand Up @@ -736,14 +701,9 @@ pub fn accumulate_dataframes_horizontal(dfs: Vec<DataFrame>) -> Result<DataFrame
Ok(acc_df)
}

#[cfg(target_os = "linux")]
extern "C" {
#[allow(dead_code)]
pub fn malloc_trim(__pad: usize) -> std::os::raw::c_int;
}

/// Simple wrapper to parallelize functions that can be divided over threads aggregated and
/// finally aggregated in the main thread. This can be done for sum, min, max, etc.
#[cfg(feature = "private")]
pub fn parallel_op_series<F>(f: F, s: Series, n_threads: Option<usize>) -> Result<Series>
where
F: Fn(Series) -> Result<Series> + Send + Sync,
Expand Down
6 changes: 2 additions & 4 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::csv::CsvEncoding;
use crate::csv_core::parser::next_line_position;
use ahash::RandomState;
use lazy_static::lazy_static;
use polars_core::datatypes::PlHashSet;
use polars_core::prelude::*;
use regex::{Regex, RegexBuilder};
use std::borrow::Cow;
use std::collections::HashSet;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};

pub(crate) fn init_csv_reader<R: Read>(
Expand Down Expand Up @@ -141,8 +140,7 @@ pub fn infer_file_schema<R: Read + Seek>(
};

// keep track of inferred field types
let mut column_types: Vec<HashSet<DataType, RandomState>> =
vec![HashSet::with_hasher(RandomState::new()); header_length];
let mut column_types: Vec<PlHashSet<DataType>> = vec![PlHashSet::new(); header_length];
// keep track of columns with nulls
let mut nulls: Vec<bool> = vec![false; header_length];

Expand Down
10 changes: 3 additions & 7 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
//! Lazy variant of a [DataFrame](polars_core::frame::DataFrame).
#[cfg(any(feature = "parquet", feature = "csv-file"))]
use std::collections::HashMap;
use std::sync::Arc;

#[cfg(any(feature = "parquet", feature = "csv-file"))]
use ahash::RandomState;

use polars_core::datatypes::PlHashMap;
use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
use polars_core::toggle_string_cache;
use std::sync::Arc;

use crate::logical_plan::optimizer::aggregate_pushdown::AggregatePushdown;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
Expand Down Expand Up @@ -512,7 +508,7 @@ impl LazyFrame {
if agg_scan_projection {
// scan the LP to aggregate all the column used in scans
// these columns will be added to the state of the AggScanProjection rule
let mut columns = HashMap::with_capacity_and_hasher(32, RandomState::default());
let mut columns = PlHashMap::with_capacity(32);
agg_projection(lp_top, &mut columns, lp_arena);

let opt = AggScanProjection { columns };
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use ahash::RandomState;

use crate::logical_plan::optimizer::stack_opt::OptimizationRule;
use crate::logical_plan::ALogicalPlanBuilder;
use crate::prelude::*;
use polars_core::datatypes::{PlHashMap, PlHashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;

fn process_with_columns(
path: &Path,
with_columns: &Option<Vec<String>>,
columns: &mut HashMap<PathBuf, HashSet<String, RandomState>, RandomState>,
columns: &mut PlHashMap<PathBuf, PlHashSet<String>>,
) {
if let Some(with_columns) = &with_columns {
let cols = columns
.entry(path.to_owned())
.or_insert_with(|| HashSet::with_hasher(RandomState::default()));
.or_insert_with(PlHashSet::new);
cols.extend(with_columns.iter().cloned());
}
}

/// Aggregate all the projections in an LP
pub(crate) fn agg_projection(
root: Node,
columns: &mut HashMap<PathBuf, HashSet<String, RandomState>, RandomState>,
columns: &mut PlHashMap<PathBuf, PlHashSet<String>>,
lp_arena: &Arena<ALogicalPlan>,
) {
use ALogicalPlan::*;
Expand Down Expand Up @@ -54,7 +51,7 @@ pub(crate) fn agg_projection(
/// Due to self joins there can be multiple Scans of the same file in a LP. We already cache the scans
/// in the PhysicalPlan, but we need to make sure that the first scan has all the columns needed.
pub struct AggScanProjection {
pub columns: HashMap<PathBuf, HashSet<String, RandomState>, RandomState>,
pub columns: PlHashMap<PathBuf, PlHashSet<String>>,
}

impl AggScanProjection {
Expand Down
10 changes: 4 additions & 6 deletions polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::prelude::*;
use ahash::RandomState;
use polars_core::prelude::*;
use std::collections::HashMap;
use polars_core::{datatypes::PlHashMap, prelude::*};

pub(crate) mod aggregate_pushdown;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
Expand All @@ -19,8 +17,8 @@ pub trait Optimize {
}

// arbitrary constant to reduce reallocation.
const HASHMAP_SIZE: usize = 32;
const HASHMAP_SIZE: usize = 16;

pub(crate) fn init_hashmap<K, V>() -> HashMap<K, V, RandomState> {
HashMap::with_capacity_and_hasher(HASHMAP_SIZE, RandomState::new())
pub(crate) fn init_hashmap<K, V>() -> PlHashMap<K, V> {
PlHashMap::with_capacity(HASHMAP_SIZE)
}

0 comments on commit ce960b5

Please sign in to comment.