Skip to content

Commit

Permalink
fix bug in outer_join functions, add tests (#1498)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcvanheerden committed Oct 8, 2021
1 parent bf2f56a commit 163457f
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 20 deletions.
131 changes: 124 additions & 7 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ unsafe fn get_hash_tbl_threaded_join_partitioned<T, H>(
hash_tables.get_unchecked(idx)
}

#[allow(clippy::type_complexity)]
unsafe fn get_hash_tbl_threaded_join_mut_partitioned<T, H>(
h: u64,
hash_tables: &mut [HashMap<T, Vec<u32>, H>],
hash_tables: &mut [HashMap<T, (bool, Vec<u32>), H>],
len: u64,
) -> &mut HashMap<T, Vec<u32>, H> {
) -> &mut HashMap<T, (bool, Vec<u32>), H> {
let mut idx = 0;
for i in 0..len {
// can only be done for powers of two.
Expand Down Expand Up @@ -315,7 +316,7 @@ where
/// Probe the build table and add tuples to the results (inner join)
fn probe_outer<T, F, G, H>(
probe_hashes: &[Vec<(u64, T)>],
hash_tbls: &mut [PlHashMap<T, Vec<u32>>],
hash_tbls: &mut [PlHashMap<T, (bool, Vec<u32>)>],
results: &mut Vec<(Option<u32>, Option<u32>)>,
n_tables: u64,
// Function that get index_a, index_b when there is a match and pushes to result
Expand Down Expand Up @@ -349,8 +350,9 @@ fn probe_outer<T, F, G, H>(

match entry {
// match and remove
RawEntryMut::Occupied(occupied) => {
let indexes_b = occupied.remove();
RawEntryMut::Occupied(mut occupied) => {
let (tracker, indexes_b) = occupied.get_mut();
*tracker = true;
results.extend(indexes_b.iter().map(|&idx_b| swap_fn_match(idx_a, idx_b)))
}
// no match
Expand All @@ -361,9 +363,11 @@ fn probe_outer<T, F, G, H>(
}

for hash_tbl in hash_tbls {
hash_tbl.iter().for_each(|(_k, indexes_b)| {
hash_tbl.iter().for_each(|(_k, (tracker, indexes_b))| {
// remaining joined values from the right table
results.extend(indexes_b.iter().map(|&idx_b| swap_fn_drain(idx_b)))
if !*tracker {
results.extend(indexes_b.iter().map(|&idx_b| swap_fn_drain(idx_b)))
}
});
}
}
Expand Down Expand Up @@ -1667,6 +1671,119 @@ mod test {

#[test]
#[cfg_attr(miri, ignore)]
fn test_joins_with_duplicates() -> Result<()> {
// test joins with duplicates in both dataframes

let df_left = df![
"col1" => [1, 1, 2],
"int_col" => [1, 2, 3]
]
.unwrap();

let df_right = df![
"join_col1" => [1, 1, 1, 1, 1, 3],
"dbl_col" => [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
]
.unwrap();

let df_inner_join = df_left.inner_join(&df_right, "col1", "join_col1").unwrap();

assert_eq!(df_inner_join.height(), 10);
assert_eq!(df_inner_join.column("col1")?.null_count(), 0);
assert_eq!(df_inner_join.column("int_col")?.null_count(), 0);
assert_eq!(df_inner_join.column("dbl_col")?.null_count(), 0);

let df_left_join = df_left.left_join(&df_right, "col1", "join_col1").unwrap();

assert_eq!(df_left_join.height(), 11);
assert_eq!(df_left_join.column("col1")?.null_count(), 0);
assert_eq!(df_left_join.column("int_col")?.null_count(), 0);
assert_eq!(df_left_join.column("dbl_col")?.null_count(), 1);

let df_outer_join = df_left.outer_join(&df_right, "col1", "join_col1").unwrap();

assert_eq!(df_outer_join.height(), 12);
assert_eq!(df_outer_join.column("col1")?.null_count(), 0);
assert_eq!(df_outer_join.column("int_col")?.null_count(), 1);
assert_eq!(df_outer_join.column("dbl_col")?.null_count(), 1);

Ok(())
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_multi_joins_with_duplicates() -> Result<()> {
// test joins with multiple join columns and duplicates in both
// dataframes

let df_left = df![
"col1" => [1, 1, 1],
"join_col2" => ["a", "a", "b"],
"int_col" => [1, 2, 3]
]
.unwrap();

let df_right = df![
"join_col1" => [1, 1, 1, 1, 1, 2],
"col2" => ["a", "a", "a", "a", "a", "c"],
"dbl_col" => [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
]
.unwrap();

let df_inner_join = df_left
.join(
&df_right,
&["col1", "join_col2"],
&["join_col1", "col2"],
JoinType::Inner,
None,
)
.unwrap();

assert_eq!(df_inner_join.height(), 10);
assert_eq!(df_inner_join.column("col1")?.null_count(), 0);
assert_eq!(df_inner_join.column("join_col2")?.null_count(), 0);
assert_eq!(df_inner_join.column("int_col")?.null_count(), 0);
assert_eq!(df_inner_join.column("dbl_col")?.null_count(), 0);

let df_left_join = df_left
.join(
&df_right,
&["col1", "join_col2"],
&["join_col1", "col2"],
JoinType::Left,
None,
)
.unwrap();

assert_eq!(df_left_join.height(), 11);
assert_eq!(df_left_join.column("col1")?.null_count(), 0);
assert_eq!(df_left_join.column("join_col2")?.null_count(), 0);
assert_eq!(df_left_join.column("int_col")?.null_count(), 0);
assert_eq!(df_left_join.column("dbl_col")?.null_count(), 1);

let df_outer_join = df_left
.join(
&df_right,
&["col1", "join_col2"],
&["join_col1", "col2"],
JoinType::Outer,
None,
)
.unwrap();

assert_eq!(df_outer_join.height(), 12);
assert_eq!(df_outer_join.column("col1")?.null_count(), 0);
assert_eq!(df_outer_join.column("join_col2")?.null_count(), 0);
assert_eq!(df_outer_join.column("int_col")?.null_count(), 1);
assert_eq!(df_outer_join.column("dbl_col")?.null_count(), 1);

Ok(())
}

#[test]
#[cfg_attr(miri, ignore)]
#[cfg(feature = "dtype-u64")]
fn test_join_floats() -> Result<()> {
let df_a = df! {
"a" => &[1.0, 2.0, 1.0, 1.0],
Expand Down
67 changes: 60 additions & 7 deletions polars/polars-core/src/frame/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,55 @@ fn create_build_table(
.collect()
}

fn create_build_table_outer(
hashes: &[UInt64Chunked],
keys: &DataFrame,
) -> Vec<HashMap<IdxHash, (bool, Vec<u32>), IdBuildHasher>> {
// Outer join equivalent of create_build_table() adds a bool in the hashmap values for tracking
// whether a value in the hash table has already been matched to a value in the probe hashes.
let n_partitions = set_partition_size();

// We will create a hashtable in every thread.
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|part_no| {
let part_no = part_no as u64;
let mut hash_tbl: HashMap<IdxHash, (bool, Vec<u32>), IdBuildHasher> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());

let n_partitions = n_partitions as u64;
let mut offset = 0;
for hashes in hashes {
for hashes in hashes.data_views() {
let len = hashes.len();
let mut idx = 0;
hashes.iter().for_each(|h| {
// partition hashes by thread no.
// So only a part of the hashes go to this hashmap
if this_partition(*h, part_no, n_partitions) {
let idx = idx + offset;
populate_multiple_key_hashmap(
&mut hash_tbl,
idx,
*h,
keys,
|| (false, vec![idx]),
|v| v.1.push(idx),
)
}
idx += 1;
});

offset += len as u32;
}
}
hash_tbl
})
})
.collect()
}

/// Probe the build table and add tuples to the results (inner join)
#[allow(clippy::too_many_arguments)]
fn probe_inner<F>(
Expand Down Expand Up @@ -259,9 +308,10 @@ pub(crate) fn left_join_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<(u32,

/// Probe the build table and add tuples to the results (inner join)
#[allow(clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
fn probe_outer<F, G, H>(
probe_hashes: &[UInt64Chunked],
hash_tbls: &mut [HashMap<IdxHash, Vec<u32>, IdBuildHasher>],
hash_tbls: &mut [HashMap<IdxHash, (bool, Vec<u32>), IdBuildHasher>],
results: &mut Vec<(Option<u32>, Option<u32>)>,
n_tables: u64,
a: &DataFrame,
Expand Down Expand Up @@ -303,8 +353,9 @@ fn probe_outer<F, G, H>(

match entry {
// match and remove
RawEntryMut::Occupied(occupied) => {
let indexes_b = occupied.remove();
RawEntryMut::Occupied(mut occupied) => {
let (tracker, indexes_b) = occupied.get_mut();
*tracker = true;
results.extend(indexes_b.iter().map(|&idx_b| swap_fn_match(idx_a, idx_b)))
}
// no match
Expand All @@ -316,9 +367,11 @@ fn probe_outer<F, G, H>(
}

for hash_tbl in hash_tbls {
hash_tbl.iter().for_each(|(_k, indexes_b)| {
// remaining joined values from the right table
results.extend(indexes_b.iter().map(|&idx_b| swap_fn_drain(idx_b)))
hash_tbl.iter().for_each(|(_k, (tracker, indexes_b))| {
// remaining unmatched joined values from the right table
if !*tracker {
results.extend(indexes_b.iter().map(|&idx_b| swap_fn_drain(idx_b)))
}
});
}
}
Expand All @@ -341,7 +394,7 @@ pub(crate) fn outer_join_multiple_keys(
let (build_hashes, random_state) = df_rows_to_hashes_threaded(&dfs_b, None);
let (probe_hashes, _) = df_rows_to_hashes_threaded(&dfs_a, Some(random_state));

let mut hash_tbls = create_build_table(&build_hashes, b);
let mut hash_tbls = create_build_table_outer(&build_hashes, b);
// early drop to reduce memory pressure
drop(build_hashes);

Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ pub(crate) fn this_partition(h: u64, thread_no: u64, n_partitions: u64) -> bool

pub(crate) fn prepare_hashed_relation_threaded<T, I>(
iters: Vec<I>,
) -> Vec<HashMap<T, Vec<u32>, RandomState>>
) -> Vec<HashMap<T, (bool, Vec<u32>), RandomState>>
where
I: Iterator<Item = T> + Send + TrustedLen,
T: Send + Hash + Eq + Sync + Copy,
Expand All @@ -401,7 +401,7 @@ where
let build_hasher = build_hasher.clone();
let hashes_and_keys = &hashes_and_keys;
let partition_no = partition_no as u64;
let mut hash_tbl: HashMap<T, Vec<u32>, RandomState> =
let mut hash_tbl: HashMap<T, (bool, Vec<u32>), RandomState> =
HashMap::with_hasher(build_hasher);

let n_threads = n_partitions as u64;
Expand All @@ -424,11 +424,11 @@ where

match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(*h, *k, vec![idx]);
entry.insert_hashed_nocheck(*h, *k, (false, vec![idx]));
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
v.push(idx);
v.1.push(idx);
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,10 @@ def test_join():
assert joined["b"].series_equal(pl.Series("", [1, 3, 2, 2, 4]))
joined = df_left.join(df_right, left_on="a", right_on="a", how="outer").sort("a")
assert joined["c_right"].null_count() == 1
assert joined["c"].null_count() == 2
assert joined["b"].null_count() == 2
assert joined["c"].null_count() == 1
assert joined["b"].null_count() == 1
assert joined["k"].null_count() == 1
assert joined["a"].null_count() == 0

df_a = pl.DataFrame({"a": [1, 2, 1, 1], "b": ["a", "b", "c", "c"]})
df_b = pl.DataFrame(
Expand Down

0 comments on commit 163457f

Please sign in to comment.