Skip to content

Commit

Permalink
Add semi and anti joins. (#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 15, 2022
1 parent eb81237 commit 2255fe6
Show file tree
Hide file tree
Showing 14 changed files with 557 additions and 210 deletions.
1 change: 1 addition & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ product = ["polars-core/product"]
unique_counts = ["polars-core/unique_counts", "polars-lazy/unique_counts"]
log = ["polars-core/log", "polars-lazy/log"]
partition_by = ["polars-core/partition_by"]
semi_anti_join = ["polars-core/semi_anti_join"]

series_from_anyvalue = ["polars-core/series_from_anyvalue"]

Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ product = []
unique_counts = []
log = []
partition_by = []
semi_anti_join = []

dynamic_groupby = ["dtype-datetime", "dtype-date"]

Expand Down
63 changes: 60 additions & 3 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use polars_arrow::utils::CustomIterTools;
use crate::frame::hash_join::multiple_keys::{
inner_join_multiple_keys, left_join_multiple_keys, outer_join_multiple_keys,
};

#[cfg(feature = "semi_anti_join")]
use crate::frame::hash_join::multiple_keys::{left_anti_multiple_keys, left_semi_multiple_keys};

use crate::prelude::*;
use crate::utils::{set_partition_size, slice_slice, split_ca};
use crate::vector_hasher::{
Expand Down Expand Up @@ -73,13 +77,17 @@ pub enum JoinType {
#[cfg(feature = "asof_join")]
AsOf(AsOfOptions),
Cross,
#[cfg(feature = "semi_anti_join")]
Semi,
#[cfg(feature = "semi_anti_join")]
Anti,
}

pub(crate) unsafe fn get_hash_tbl_threaded_join_partitioned<T, H>(
pub(crate) unsafe fn get_hash_tbl_threaded_join_partitioned<Item>(
h: u64,
hash_tables: &[HashMap<T, Vec<IdxSize>, H>],
hash_tables: &[Item],
len: u64,
) -> &HashMap<T, Vec<IdxSize>, H> {
) -> &Item {
let mut idx = 0;
for i in 0..len {
// can only be done for powers of two.
Expand Down Expand Up @@ -307,6 +315,10 @@ impl DataFrame {
JoinType::Outer => {
self.outer_join_from_series(other, s_left, s_right, suffix, slice)
}
#[cfg(feature = "semi_anti_join")]
JoinType::Anti => self.semi_anti_join_from_series(s_left, s_right, slice, true),
#[cfg(feature = "semi_anti_join")]
JoinType::Semi => self.semi_anti_join_from_series(s_left, s_right, slice, false),
#[cfg(feature = "asof_join")]
JoinType::AsOf(options) => {
let left_on = selected_left[0].name();
Expand Down Expand Up @@ -457,6 +469,20 @@ impl DataFrame {
JoinType::AsOf(_) => Err(PolarsError::ComputeError(
"asof join not supported for join on multiple keys".into(),
)),
#[cfg(feature = "semi_anti_join")]
JoinType::Anti | JoinType::Semi => {
let left = DataFrame::new_no_checks(selected_left_physical);
let right = DataFrame::new_no_checks(selected_right_physical);

let idx = if matches!(how, JoinType::Anti) {
left_anti_multiple_keys(&left, &right)
} else {
left_semi_multiple_keys(&left, &right)
};
// Safety:
// indices are in bounds
Ok(unsafe { self.finish_anti_semi_join(&idx, slice) })
}
JoinType::Cross => {
unreachable!()
}
Expand Down Expand Up @@ -638,6 +664,37 @@ impl DataFrame {
self.finish_join(df_left, df_right, suffix)
}

#[cfg(feature = "semi_anti_join")]
/// # Safety:
/// `idx` must be in bounds
unsafe fn finish_anti_semi_join(
&self,
mut idx: &[IdxSize],
slice: Option<(i64, usize)>,
) -> DataFrame {
if let Some((offset, len)) = slice {
idx = slice_slice(idx, offset, len);
}
self.take_unchecked_slice(idx)
}

#[cfg(feature = "semi_anti_join")]
pub(crate) fn semi_anti_join_from_series(
&self,
s_left: &Series,
s_right: &Series,
slice: Option<(i64, usize)>,
anti: bool,
) -> Result<DataFrame> {
#[cfg(feature = "dtype-categorical")]
check_categorical_src(s_left.dtype(), s_right.dtype())?;

let idx = s_left.hash_join_semi_anti(s_right, anti);
// Safety:
// indices are in bounds
Ok(unsafe { self.finish_anti_semi_join(&idx, slice) })
}

/// Perform an outer join on two DataFrames
/// # Example
///
Expand Down
144 changes: 137 additions & 7 deletions polars/polars-core/src/frame/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::frame::groupby::hashing::{populate_multiple_key_hashmap, HASHMAP_INIT_SIZE};
use crate::frame::hash_join::single_keys::on_match_left_join_extend;
use crate::frame::hash_join::{
get_hash_tbl_threaded_join_mut_partitioned, get_hash_tbl_threaded_join_partitioned,
};
use crate::prelude::hash_join::single_keys::LeftJoinTuples;
use crate::prelude::*;
use crate::utils::series::to_physical_and_bit_repr;
use crate::utils::{set_partition_size, split_df};
Expand Down Expand Up @@ -247,10 +249,7 @@ pub fn private_left_join_multiple_keys(
left_join_multiple_keys(&a, &b)
}

pub(crate) fn left_join_multiple_keys(
a: &DataFrame,
b: &DataFrame,
) -> Vec<(IdxSize, Option<IdxSize>)> {
pub(crate) fn left_join_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<LeftJoinTuples> {
// we should not join on logical types
debug_assert!(!a.iter().any(|s| s.is_logical()));
debug_assert!(!b.iter().any(|s| s.is_logical()));
Expand All @@ -271,11 +270,11 @@ pub(crate) fn left_join_multiple_keys(

// next we probe the other relation
// code duplication is because we want to only do the swap check once
POOL.install(|| {
POOL.install(move || {
probe_hashes
.into_par_iter()
.zip(offsets)
.map(|(probe_hashes, offset)| {
.map(move |(probe_hashes, offset)| {
// local reference
let hash_tbls = &hash_tbls;
let mut results =
Expand All @@ -300,7 +299,7 @@ pub(crate) fn left_join_multiple_keys(
match entry {
// left and right matches
Some((_, indexes_b)) => {
results.extend(indexes_b.iter().map(|&idx_b| (idx_a, Some(idx_b))))
on_match_left_join_extend(&mut results, indexes_b, idx_a);
}
// only left values, right = null
None => results.push((idx_a, None)),
Expand All @@ -316,6 +315,137 @@ pub(crate) fn left_join_multiple_keys(
})
}

#[cfg(feature = "semi_anti_join")]
pub(crate) fn create_build_table_semi_anti(
hashes: &[UInt64Chunked],
keys: &DataFrame,
) -> Vec<HashMap<IdxHash, (), IdBuildHasher>> {
let n_partitions = set_partition_size();

// We will create a hashtable in every thread.
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|part_no| {
let part_no = part_no as u64;
let mut hash_tbl: HashMap<IdxHash, (), IdBuildHasher> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());

let n_partitions = n_partitions as u64;
let mut offset = 0;
for hashes in hashes {
for hashes in hashes.data_views() {
let len = hashes.len();
let mut idx = 0;
hashes.iter().for_each(|h| {
// partition hashes by thread no.
// So only a part of the hashes go to this hashmap
if this_partition(*h, part_no, n_partitions) {
let idx = idx + offset;
populate_multiple_key_hashmap(
&mut hash_tbl,
idx,
*h,
keys,
|| (),
|_| (),
)
}
idx += 1;
});

offset += len as IdxSize;
}
}
hash_tbl
})
})
.collect()
}

#[cfg(feature = "semi_anti_join")]
pub(crate) fn semi_anti_join_multiple_keys_impl<'a>(
a: &'a DataFrame,
b: &'a DataFrame,
) -> impl ParallelIterator<Item = (IdxSize, bool)> + 'a {
// we should not join on logical types
debug_assert!(!a.iter().any(|s| s.is_logical()));
debug_assert!(!b.iter().any(|s| s.is_logical()));

let n_threads = POOL.current_num_threads();
let dfs_a = split_df(a, n_threads).unwrap();
let dfs_b = split_df(b, n_threads).unwrap();

let (build_hashes, random_state) = df_rows_to_hashes_threaded(&dfs_b, None);
let (probe_hashes, _) = df_rows_to_hashes_threaded(&dfs_a, Some(random_state));

let hash_tbls = create_build_table_semi_anti(&build_hashes, b);
// early drop to reduce memory pressure
drop(build_hashes);

let n_tables = hash_tbls.len() as u64;
let offsets = get_offsets(&probe_hashes);

// next we probe the other relation
// code duplication is because we want to only do the swap check once
POOL.install(move || {
probe_hashes
.into_par_iter()
.zip(offsets)
.map(move |(probe_hashes, offset)| {
// local reference
let hash_tbls = &hash_tbls;
let mut results =
Vec::with_capacity(probe_hashes.len() / POOL.current_num_threads());
let local_offset = offset;

let mut idx_a = local_offset as IdxSize;
for probe_hashes in probe_hashes.data_views() {
for &h in probe_hashes {
// probe table that contains the hashed value
let current_probe_table = unsafe {
get_hash_tbl_threaded_join_partitioned(h, hash_tbls, n_tables)
};

let entry = current_probe_table.raw_entry().from_hash(h, |idx_hash| {
let idx_b = idx_hash.idx;
// Safety:
// indices in a join operation are always in bounds.
unsafe { compare_df_rows2(a, b, idx_a as usize, idx_b as usize) }
});

match entry {
// left and right matches
Some((_, _)) => results.push((idx_a, true)),
// only left values, right = null
None => results.push((idx_a, false)),
}
idx_a += 1;
}
}

results
})
.flatten()
})
}

#[cfg(feature = "semi_anti_join")]
pub(super) fn left_anti_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<IdxSize> {
semi_anti_join_multiple_keys_impl(a, b)
.filter(|tpls| !tpls.1)
.map(|tpls| tpls.0)
.collect()
}

#[cfg(feature = "semi_anti_join")]
pub(super) fn left_semi_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<IdxSize> {
semi_anti_join_multiple_keys_impl(a, b)
.filter(|tpls| tpls.1)
.map(|tpls| tpls.0)
.collect()
}

/// Probe the build table and add tuples to the results (inner join)
#[allow(clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
Expand Down

0 comments on commit 2255fe6

Please sign in to comment.