Skip to content

Commit

Permalink
reduce contention of global string cache: >4x performance improveme…
Browse files Browse the repository at this point in the history
…nt (#4078)
  • Loading branch information
ritchie46 committed Jul 19, 2022
1 parent bc1a498 commit efd1ee3
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 70 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "Arrow interfaces for Polars DataFrame library"
[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "improve_filter", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "default_utf8", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ package = "arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "98e49133b2e56e51e30335830485b3cf768eb5a2"
# path = "../../../arrow2"
branch = "improve_filter"
branch = "default_utf8"
# version = "0.12"
default-features = false
features = [
Expand Down
217 changes: 152 additions & 65 deletions polars/polars-core/src/chunked_array/logical/categorical/builder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::frame::groupby::hashing::HASHMAP_INIT_SIZE;
use crate::prelude::*;
use crate::{datatypes::PlHashMap, use_string_cache, StrHashGlobal};
use crate::{datatypes::PlHashMap, use_string_cache, StrHashGlobal, StringCache, POOL};
use ahash::CallHasher;
use arrow::array::*;
use hashbrown::hash_map::RawEntryMut;
use polars_arrow::trusted_len::PushUnchecked;
use std::hash::{Hash, Hasher};

pub enum RevMappingBuilder {
Expand All @@ -15,15 +16,19 @@ pub enum RevMappingBuilder {
}

impl RevMappingBuilder {
fn insert(&mut self, idx: u32, value: &str) {
fn insert(&mut self, value: &str) {
use RevMappingBuilder::*;
match self {
Local(builder) => builder.push(Some(value)),
Global(map, builder, _) => {
if !map.contains_key(&idx) {
builder.push(Some(value));
let new_idx = builder.len() as u32 - 1;
map.insert(idx, new_idx);
Global(_, _, _) => {
#[cfg(debug_assertions)]
{
unreachable!()
}
#[cfg(not(debug_assertions))]
{
use std::hint::unreachable_unchecked;
unsafe { unreachable_unchecked() }
}
}
};
Expand Down Expand Up @@ -152,7 +157,7 @@ impl<'a> PartialEq for StrHashLocal<'a> {
}

pub struct CategoricalChunkedBuilder {
array_builder: UInt32Vec,
cat_builder: UInt32Vec,
name: String,
reverse_mapping: RevMappingBuilder,
}
Expand All @@ -168,88 +173,170 @@ impl CategoricalChunkedBuilder {
};

Self {
array_builder: UInt32Vec::with_capacity(capacity),
cat_builder: UInt32Vec::with_capacity(capacity),
name: name.to_string(),
reverse_mapping,
}
}
}
impl CategoricalChunkedBuilder {
/// Appends all the values in a single lock of the global string cache.
pub fn drain_iter<'a, I>(&mut self, i: I)
///
/// `store_hashes` is not needed by the local builder, only for the global builder under contention
/// The hashes have the same order as the `Utf8Array` values.
fn build_local_map<'a, I>(&mut self, i: I, store_hashes: bool) -> Vec<u64>
where
I: IntoIterator<Item = Option<&'a str>>,
{
if use_string_cache() {
let mut cache = crate::STRING_CACHE.lock_map();
let mapping = &mut cache.map;
let hb = mapping.hasher().clone();

for opt_s in i {
match opt_s {
Some(s) => {
let h = str::get_hash(s, &hb);
let mut idx = mapping.len() as u32;
// Note that we don't create the StrHashGlobal to search the key in the hashmap
// as StrHashGlobal may allocate a string
let entry = mapping
.raw_entry_mut()
.from_hash(h, |val| (val.hash == h) && val.str == s);

match entry {
RawEntryMut::Occupied(entry) => idx = *entry.get(),
RawEntryMut::Vacant(entry) => {
// only just now we allocate the string
let key = StrHashGlobal::new(s.into(), h);
entry.insert_with_hasher(h, key, idx, |s| s.hash);
let mut iter = i.into_iter();
let mut hashes = if store_hashes {
Vec::with_capacity(iter.size_hint().0 / 10)
} else {
vec![]
};
// It is important that we use the same hash builder as the global `StringCache` does.
let mut mapping =
PlHashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, StringCache::get_hash_builder());
let hb = mapping.hasher().clone();
for opt_s in &mut iter {
match opt_s {
Some(s) => {
let h = str::get_hash(s, &hb);
let key = StrHashLocal::new(s, h);
let mut idx = mapping.len() as u32;

let entry = mapping.raw_entry_mut().from_key_hashed_nocheck(h, &key);

match entry {
RawEntryMut::Occupied(entry) => idx = *entry.get(),
RawEntryMut::Vacant(entry) => {
if store_hashes {
hashes.push(h)
}
entry.insert_with_hasher(h, key, idx, |s| s.hash);
self.reverse_mapping.insert(s);
}
// we still need to check if the idx is already stored in our map
self.reverse_mapping.insert(idx, s);
self.array_builder.push(Some(idx));
}
None => {
self.array_builder.push(None);
}
};
self.cat_builder.push(Some(idx));
}
None => {
self.cat_builder.push(None);
}
}
}

if mapping.len() > u32::MAX as usize {
panic!("not more than {} categories supported", u32::MAX)
};
hashes
}

fn build_global_map_contention<'a, I>(&mut self, i: I)
where
I: IntoIterator<Item = Option<&'a str>>,
{
let mut global_to_local;
// locally we don't need a hashmap because we all categories are 1 integer apart
// so the index is local, and the values is global
let mut local_to_global: Vec<u32>;
let id;

// make sure that we use the local rev_map builder
if let RevMappingBuilder::Global(map, values, id_val) = &mut self.reverse_mapping {
global_to_local = std::mem::take(map);
id = *id_val;
self.reverse_mapping = RevMappingBuilder::Local(std::mem::take(values));
} else {
let mut mapping = PlHashMap::with_capacity(HASHMAP_INIT_SIZE);
let hb = mapping.hasher().clone();
for opt_s in i {
match opt_s {
Some(s) => {
let h = str::get_hash(s, &hb);
let key = StrHashLocal::new(s, h);
let mut idx = mapping.len() as u32;

let entry = mapping.raw_entry_mut().from_key_hashed_nocheck(h, &key);

match entry {
RawEntryMut::Occupied(entry) => idx = *entry.get(),
RawEntryMut::Vacant(entry) => {
entry.insert_with_hasher(h, key, idx, |s| s.hash);
self.reverse_mapping.insert(idx, s);
}
};
self.array_builder.push(Some(idx));
}
None => {
self.array_builder.push(None);
unreachable!();
}

// first build the values: `Utf8Array`
// we can use a local hashmap for that
let hashes = self.build_local_map(i, true);

// now we have to lock the global string cache.
// we will create a mapping from our local categoricals to global categoricals
// and a mapping from global categoricals to our local categoricals

let values: Utf8Array<_> =
if let RevMappingBuilder::Local(values) = &mut self.reverse_mapping {
// resize local now that we know the size of the mapping.
local_to_global = Vec::with_capacity(values.len());
std::mem::take(values).into()
} else {
unreachable!()
};

// in a separate scope so that we drop the global cache as soon as we are finished
{
let cache = &mut crate::STRING_CACHE.lock_map();
let global_mapping = &mut cache.map;

for (s, h) in values.values_iter().zip(hashes.into_iter()) {
let mut global_idx = global_mapping.len() as u32;
// Note that we don't create the StrHashGlobal to search the key in the hashmap
// as StrHashGlobal may allocate a string
let entry = global_mapping
.raw_entry_mut()
.from_hash(h, |val| (val.hash == h) && val.str == s);

match entry {
RawEntryMut::Occupied(entry) => global_idx = *entry.get(),
RawEntryMut::Vacant(entry) => {
// only just now we allocate the string
let key = StrHashGlobal::new(s.into(), h);
entry.insert_with_hasher(h, key, global_idx, |s| s.hash);
}
}
// safety:
// we allocated enough
unsafe { local_to_global.push_unchecked(global_idx) }
}

if mapping.len() > u32::MAX as usize {
if global_mapping.len() > u32::MAX as usize {
panic!("not more than {} categories supported", u32::MAX)
};
}

let fill_global_to_local = || {
let mut local_idx = 0;
#[allow(clippy::explicit_counter_loop)]
for global_idx in &local_to_global {
// we know the keys are unique so this is much faster
global_to_local.insert_unique_unchecked(*global_idx, local_idx);
local_idx += 1;
}
};

let update_cats = || {
self.cat_builder.apply_values(|cats| {
for cat in cats {
debug_assert!((*cat as usize) < local_to_global.len());
*cat = *unsafe { local_to_global.get_unchecked(*cat as usize) };
}
});
};

POOL.join(fill_global_to_local, update_cats);

self.reverse_mapping =
RevMappingBuilder::Global(global_to_local, values.into_mut().right().unwrap(), id)
}

/// Appends all the values in a single lock of the global string cache.
pub fn drain_iter<'a, I>(&mut self, i: I)
where
I: IntoIterator<Item = Option<&'a str>>,
{
if use_string_cache() {
self.build_global_map_contention(i)
} else {
let _ = self.build_local_map(i, false);
}
}

pub fn finish(mut self) -> CategoricalChunked {
CategoricalChunked::from_chunks_original(
&self.name,
vec![self.array_builder.as_box()],
vec![self.cat_builder.as_box()],
self.reverse_mapping.finish(),
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::frame::groupby::hashing::HASHMAP_INIT_SIZE;
use crate::prelude::PlHashMap;
use ahash::RandomState;
use once_cell::sync::Lazy;
use smartstring::{LazyCompact, SmartString};
use std::borrow::Borrow;
Expand Down Expand Up @@ -46,7 +48,10 @@ pub(crate) struct SCacheInner {
impl Default for SCacheInner {
fn default() -> Self {
Self {
map: Default::default(),
map: PlHashMap::with_capacity_and_hasher(
HASHMAP_INIT_SIZE,
StringCache::get_hash_builder(),
),
uuid: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
Expand All @@ -62,6 +67,13 @@ impl Default for SCacheInner {
pub(crate) struct StringCache(pub(crate) Mutex<SCacheInner>);

impl StringCache {
/// The global `StringCache` will always use a predictable seed. This allows local builders to mimic
/// the hashes in case of contention.
pub(crate) fn get_hash_builder() -> RandomState {
RandomState::with_seed(0)
}

/// Lock the string cache
pub(crate) fn lock_map(&self) -> MutexGuard<SCacheInner> {
self.0.lock().unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private = ["polars-time/private"]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "improve_filter", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "default_utf8", default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit efd1ee3

Please sign in to comment.