Skip to content

Commit

Permalink
inner join unlimited multiple keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 3, 2021
1 parent 2dfb4b3 commit e9eeb0a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 125 deletions.
32 changes: 21 additions & 11 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,13 @@ where
///
/// # Safety
/// Doesn't check any bounds
unsafe fn compare_fn(keys: &DataFrame, idx_a: u32, idx_b: u32) -> bool {
pub(crate) unsafe fn compare_df_rows(keys: &DataFrame, idx_a: u32, idx_b: u32) -> bool {
let idx_a = idx_a as usize;
let idx_b = idx_b as usize;
for s in keys.get_columns() {
if !(s.get_unchecked(idx_a as usize) == s.get_unchecked(idx_b as usize)) {
dbg!(idx_a, idx_b, s.get_unchecked(idx_a), s.get_unchecked(idx_b));

if !(s.get_unchecked(idx_a) == s.get_unchecked(idx_b)) {
return false;
}
}
Expand All @@ -252,7 +256,7 @@ unsafe fn compare_fn(keys: &DataFrame, idx_a: u32, idx_b: u32) -> bool {
/// To check if a row is equal the original DataFrame is also passed as ref.
/// When a hash collision occurs the indexes are ptrs to the rows and the rows are compared
/// on equality.
pub(crate) fn populate_multiple_key_hashmap<V, H, F>(
pub(crate) fn populate_multiple_key_hashmap<V, H, F, G>(
hash_tbl: &mut HashMap<IdxHash, V, H>,
// row index
idx: u32,
Expand All @@ -262,10 +266,11 @@ pub(crate) fn populate_multiple_key_hashmap<V, H, F>(
// the keys are needed for the equality check
keys: &DataFrame,
// value to insert
value: V,
vacant_fn: G,
// function that gets a mutable ref to the occupied value in the hash table
occupied_fn: F,
) where
G: Fn() -> V,
F: Fn(&mut V),
H: BuildHasher,
{
Expand All @@ -276,12 +281,12 @@ pub(crate) fn populate_multiple_key_hashmap<V, H, F>(
.from_hash(h, |idx_hash| {
let key_idx = idx_hash.idx;
// Safety:
// indices in a join operation are always in bounds.
unsafe { compare_fn(keys, key_idx, idx) }
// indices in a groupby operation are always in bounds.
unsafe { compare_df_rows(keys, key_idx, idx) }
});
match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(h, IdxHash::new(idx, h), value);
entry.insert_hashed_nocheck(h, IdxHash::new(idx, h), vacant_fn());
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
Expand All @@ -301,9 +306,14 @@ fn groupby_multiple_keys(keys: DataFrame) -> GroupTuples {
let mut idx = 0;
for hashes_chunk in hashes.data_views() {
for &h in hashes_chunk {
populate_multiple_key_hashmap(&mut hash_tbl, idx, h, &keys, (idx, vec![idx]), |v| {
v.1.push(idx)
});
populate_multiple_key_hashmap(
&mut hash_tbl,
idx,
h,
&keys,
|| (idx, vec![idx]),
|v| v.1.push(idx),
);
idx += 1;
}
}
Expand Down Expand Up @@ -349,7 +359,7 @@ fn groupby_threaded_multiple_keys_flat(keys: DataFrame, n_threads: usize) -> Gro
idx,
h,
&keys,
(idx, vec![idx]),
|| (idx, vec![idx]),
|v| v.1.push(idx),
);
}
Expand Down
63 changes: 21 additions & 42 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod multiple_keys;

use crate::frame::hash_join::multiple_keys::inner_join_multiple_keys;
use crate::frame::select::Selection;
use crate::prelude::*;
use crate::utils::{split_ca, NoNull};
Expand Down Expand Up @@ -43,14 +44,11 @@ pub enum JoinType {
Outer,
}

unsafe fn get_hash_tbl<T>(
unsafe fn get_hash_tbl_threaded_join<T, H>(
h: u64,
hash_tables: &[HashMap<T, Vec<u32>, RandomState>],
hash_tables: &[HashMap<T, Vec<u32>, H>],
len: u64,
) -> &HashMap<T, Vec<u32>, RandomState>
where
T: Send + Hash + Eq + Sync + Copy,
{
) -> &HashMap<T, Vec<u32>, H> {
let mut idx = 0;
for i in 0..len {
if (h + i) % len == 0 {
Expand Down Expand Up @@ -86,13 +84,13 @@ fn probe_inner<T, F>(
n_tables: u64,
swap_fn: F,
) where
T: Send + Hash + Eq + Sync + Copy + Debug,
T: Send + Hash + Eq + Sync + Copy,
F: Fn(u32, u32) -> (u32, u32),
{
probe_hashes.iter().enumerate().for_each(|(idx_a, (h, k))| {
let idx_a = (idx_a + local_offset) as u32;
// probe table that contains the hashed value
let current_probe_table = unsafe { get_hash_tbl(*h, hash_tbls, n_tables) };
let current_probe_table = unsafe { get_hash_tbl_threaded_join(*h, hash_tbls, n_tables) };

let entry = current_probe_table
.raw_entry()
Expand Down Expand Up @@ -217,7 +215,8 @@ where
probe_hashes.iter().enumerate().for_each(|(idx_a, (h, k))| {
let idx_a = (idx_a + offset) as u32;
// probe table that contains the hashed value
let current_probe_table = unsafe { get_hash_tbl(*h, hash_tbls, n_tables) };
let current_probe_table =
unsafe { get_hash_tbl_threaded_join(*h, hash_tbls, n_tables) };

// we already hashed, so we don't have to hash again.
let entry = current_probe_table
Expand Down Expand Up @@ -907,43 +906,20 @@ impl DataFrame {
new.unwrap()
}

impl DataFrame {
fn len(&self) -> usize {
self.height()
}
}

// This is still single threaded and can create very large keys that are inserted in the
// hashmap. TODO: implement same hashing technique as in grouping.
match how {
JoinType::Inner => {
let join_tuples = match selected_left.len() {
2 => {
let a = static_zip!(selected_left, 1);
let b = static_zip!(selected_right, 1);
let (a, b, swap) = det_hash_prone_order2!(a, b);
hash_join_tuples_inner_threaded(vec![a], vec![b], swap)
}
3 => {
let a = static_zip!(selected_left, 2);
let b = static_zip!(selected_right, 2);
let (a, b, swap) = det_hash_prone_order2!(a, b);
hash_join_tuples_inner_threaded(vec![a], vec![b], swap)
}
4 => {
let a = static_zip!(selected_left, 3);
let b = static_zip!(selected_right, 3);
let (a, b, swap) = det_hash_prone_order2!(a, b);
hash_join_tuples_inner_threaded(vec![a], vec![b], swap)
}
5 => {
let a = static_zip!(selected_left, 4);
let b = static_zip!(selected_right, 4);
let (a, b, swap) = det_hash_prone_order2!(a, b);
hash_join_tuples_inner_threaded(vec![a], vec![b], swap)
}
6 => {
let a = static_zip!(selected_left, 5);
let b = static_zip!(selected_right, 5);
let (a, b, swap) = det_hash_prone_order2!(a, b);
hash_join_tuples_inner_threaded(vec![a], vec![b], swap)
}
_ => todo!(),
};
let left = DataFrame::new_no_checks(selected_left);
let right = DataFrame::new_no_checks(selected_right.clone());
let (left, right, swap) = det_hash_prone_order!(left, right);
let join_tuples = inner_join_multiple_keys(&left, &right, swap);

let (df_left, df_right) = POOL.join(
|| self.create_left_df(&join_tuples, false),
Expand Down Expand Up @@ -1369,11 +1345,14 @@ mod test {
.join(&df_b, &["a", "b"], &["foo", "bar"], JoinType::Left)
.unwrap();
let ca = joined.column("ham").unwrap().utf8().unwrap();
dbg!(&df_a, &df_b);
assert_eq!(Vec::from(ca), correct_ham);
let joined_inner_hack = df_a.inner_join(&df_b, "dummy", "dummy").unwrap();
let joined_inner = df_a
.join(&df_b, &["a", "b"], &["foo", "bar"], JoinType::Inner)
.unwrap();

dbg!(&joined_inner_hack, &joined_inner);
assert!(joined_inner_hack
.column("ham")
.unwrap()
Expand Down

0 comments on commit e9eeb0a

Please sign in to comment.