Skip to content

Commit

Permalink
perf(rust, python): vectorize integer vec-hash by using very simple, … (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 21, 2022
1 parent 808d8ac commit d3d58a3
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 94 deletions.
1 change: 0 additions & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ bitflags.workspace = true
chrono = { version = "0.4", optional = true }
chrono-tz = { version = "0.6", optional = true }
comfy-table = { version = "6.1.1", optional = true }
fxhash = "0.2.1"
hashbrown.workspace = true
hex = { version = "0.4", optional = true }
indexmap = { version = "1", features = ["std"] }
Expand Down
187 changes: 119 additions & 68 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,93 +42,146 @@ pub(crate) fn get_null_hash_value(random_state: RandomState) -> u64 {
hasher.finish()
}

impl<T> VecHash for ChunkedArray<T>
macro_rules! fx_hash_8_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u8>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_16_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u16>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_32_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u32>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_64_bit {
($val: expr, $k: expr ) => {{
($val as u64).wrapping_mul($k)
}};
}

fn finish_vec_hash<T>(ca: &ChunkedArray<T>, random_state: RandomState, buf: &mut Vec<u64>)
where
T: PolarsIntegerType,
T::Native: Hash,
{
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
// Note that we don't use the no null branch! This can break in unexpected ways.
// for instance with threading we split an array in n_threads, this may lead to
// splits that have no nulls and splits that have nulls. Then one array is hashed with
// Option<T> and the other array with T.
// Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
// the only deterministic seed.
buf.clear();
buf.reserve(self.len());
self.downcast_iter().for_each(|arr| {
buf.extend(
arr.values()
.as_slice()
.iter()
.map(|v| random_state.hash_single(v)),
);
});

let null_h = get_null_hash_value(random_state);
let hashes = buf.as_mut_slice();
let null_h = get_null_hash_value(random_state);
let hashes = buf.as_mut_slice();

let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
if arr.null_count() > 0 {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
*h = [null_h, *h][valid as usize];
})
}
offset += arr.len();
});
}

let mut offset = 0;
self.downcast_iter().for_each(|arr| {
if arr.null_count() > 0 {
fn integer_vec_hash_combine<T>(ca: &ChunkedArray<T>, random_state: RandomState, hashes: &mut [u64])
where
T: PolarsIntegerType,
T::Native: Hash,
{
let null_h = get_null_hash_value(random_state.clone());

let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values()
.as_slice()
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = random_state.hash_single(v);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
*h = [null_h, *h][valid as usize];
})
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
*h = _boost_hash_combine(
[null_h, random_state.hash_single(l)][valid as usize],
*h,
)
});
}
offset += arr.len();
});
}
}
offset += arr.len();
});
}

fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state.clone());
macro_rules! vec_hash_int {
($ca:ident, $fx_hash:ident) => {
impl VecHash for $ca {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
// Note that we don't use the no null branch! This can break in unexpected ways.
// for instance with threading we split an array in n_threads, this may lead to
// splits that have no nulls and splits that have nulls. Then one array is hashed with
// Option<T> and the other array with T.
// Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
// the only deterministic seed.
buf.clear();
buf.reserve(self.len());

let k: u64 = 0x517cc1b727220a95;
let k = random_state.hash_one(k);

#[allow(unused_unsafe)]
#[allow(clippy::useless_transmute)]
self.downcast_iter().for_each(|arr| {
buf.extend(
arr.values()
.as_slice()
.iter()
.copied()
.map(|v| unsafe { $fx_hash!(v, k) }),
);
});
finish_vec_hash(self, random_state, buf)
}

let mut offset = 0;
self.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values()
.as_slice()
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = random_state.hash_single(v);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
*h = _boost_hash_combine(
[null_h, random_state.hash_single(l)][valid as usize],
*h,
)
});
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
integer_vec_hash_combine(self, random_state, hashes)
}
offset += arr.len();
});
}
}
};
}

vec_hash_int!(Int64Chunked, fx_hash_64_bit);
vec_hash_int!(Int32Chunked, fx_hash_32_bit);
vec_hash_int!(Int16Chunked, fx_hash_16_bit);
vec_hash_int!(Int8Chunked, fx_hash_8_bit);
vec_hash_int!(UInt64Chunked, fx_hash_64_bit);
vec_hash_int!(UInt32Chunked, fx_hash_32_bit);
vec_hash_int!(UInt16Chunked, fx_hash_16_bit);
vec_hash_int!(UInt8Chunked, fx_hash_8_bit);

impl VecHash for Utf8Chunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
let null_h = get_null_hash_value(random_state.clone());
// for strings we use fxhash
let fxh = fxhash::FxBuildHasher::default();
self.downcast_iter().for_each(|arr| {
if arr.null_count() == 0 {
buf.extend(arr.values_iter().map(|v| fxh.hash_single(v)))
buf.extend(arr.values_iter().map(|v| random_state.hash_single(v)))
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => random_state.hash_single(v),
Expand All @@ -139,13 +192,11 @@ impl VecHash for Utf8Chunked {
}

fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state);
// for strings we use fxhash
let fxh = fxhash::FxBuildHasher::default();
let null_h = get_null_hash_value(random_state.clone());
self.apply_to_slice(
|opt_v, h| {
let l = match opt_v {
Some(v) => fxh.hash_single(v),
Some(v) => random_state.hash_single(v),
None => null_h,
};
_boost_hash_combine(l, *h)
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ description = "Lazy query engine for the Polars DataFrame library"

[dependencies]
enum_dispatch = "0.3"
fxhash = "0.2.1"
hashbrown.workspace = true
num.workspace = true
polars-arrow = { version = "0.25.1", path = "../../polars-arrow", default-features = false }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::any::Any;

use hashbrown::hash_map::RawEntryMut;
use num::NumCast;
use polars_core::export::ahash::RandomState;
use polars_core::frame::row::AnyValueBuffer;
use polars_core::prelude::*;
use polars_core::utils::{_set_partition_size, accumulate_dataframes_vertical_unchecked};
Expand Down Expand Up @@ -42,8 +43,7 @@ pub struct Utf8GroupbySink {
key_column: Arc<dyn PhysicalPipedExpr>,
// the columns that will be aggregated
aggregation_columns: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
hb: fxhash::FxBuildHasher,
// hb: RandomState,
hb: RandomState,
// Initializing Aggregation functions. If we aggregate by 2 columns
// this vec will have two functions. We will use these functions
// to populate the buffer where the hashmap points to
Expand All @@ -63,7 +63,7 @@ impl Utf8GroupbySink {
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
let hb = fxhash::FxBuildHasher::default();
let hb = Default::default();
let partitions = _set_partition_size();

let pre_agg = load_vec(partitions, || PlIdHashMap::with_capacity(HASHMAP_INIT_SIZE));
Expand Down
2 changes: 0 additions & 2 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions py-polars/polars/internals/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6496,10 +6496,10 @@ def hash_rows(
shape: (4,)
Series: '' [u64]
[
1160655983065896620
8421503603771360652
4702262519505526977
5983473495725024293
12239174968153954787
17976148875586754089
10047419486152048166
13766281409932363907
]
"""
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/internals/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3630,9 +3630,9 @@ def hash(
│ --- ┆ --- │
│ u64 ┆ u64 │
╞══════════════════════╪══════════════════════╡
4629889412789719550 ┆ 6959506404929392568 │
9774092659964970114 ┆ 6959506404929392568 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
16386608652769605760 ┆ 11638928888656214026 │
1101441246220388612 ┆ 11638928888656214026 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 11638928888656214026 ┆ 11040941213715918520 │
└──────────────────────┴──────────────────────┘
Expand Down
6 changes: 3 additions & 3 deletions py-polars/polars/internals/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4143,9 +4143,9 @@ def hash(
shape: (3,)
Series: 'a' [u64]
[
2374023516666777365
10386026231460783898
17796317186427479491
10734580197236529959
3022416320763508302
13756996518000038261
]
"""
Expand Down
12 changes: 2 additions & 10 deletions py-polars/tests/unit/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -1394,17 +1394,9 @@ def test_reproducible_hash_with_seeds() -> None:
# in the meantime, account for arm64 (mac) hash values to reduce noise
expected = pl.Series(
"s",
[
7179856081800753525,
15496313222292466864,
4963241831945886452,
]
[8823051245921001677, 988796329533502010, 7528667241828618484]
if platform.mac_ver()[-1] == "arm64"
else [
8823051245921001677,
988796329533502010,
7528667241828618484,
],
else [6629530352159708028, 988796329533502010, 6048298245521876612],
dtype=pl.UInt64,
)
result = df.hash_rows(*seeds)
Expand Down

0 comments on commit d3d58a3

Please sign in to comment.