From 8f4050987bd65a9116a77ee0afec13e35344f57f Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 10 Apr 2024 07:50:42 +0200 Subject: [PATCH] perf: join by row-encoding (#15559) --- crates/polars-arrow/src/compute/utils.rs | 1 + .../polars-core/src/chunked_array/binary.rs | 72 ++ crates/polars-core/src/chunked_array/float.rs | 29 + crates/polars-core/src/chunked_array/mod.rs | 1 + .../ops/sort/arg_sort_multiple.rs | 36 +- .../polars-core/src/frame/group_by/hashing.rs | 2 +- .../src/frame/group_by/into_groups.rs | 98 +-- crates/polars-core/src/frame/group_by/mod.rs | 4 +- crates/polars-core/src/utils/series.rs | 15 - .../src/physical_plan/expressions/window.rs | 19 +- .../polars-ops/src/frame/join/asof/groups.rs | 102 ++- crates/polars-ops/src/frame/join/asof/mod.rs | 2 +- .../src/frame/join/hash_join/mod.rs | 13 +- .../src/frame/join/hash_join/multiple_keys.rs | 625 ------------------ .../join/hash_join/single_keys_dispatch.rs | 295 +++++---- .../src/frame/join/hash_join/sort_merge.rs | 8 +- crates/polars-ops/src/frame/join/mod.rs | 200 +++--- crates/polars-utils/src/total_ord.rs | 2 + 18 files changed, 532 insertions(+), 992 deletions(-) create mode 100644 crates/polars-core/src/chunked_array/binary.rs delete mode 100644 crates/polars-ops/src/frame/join/hash_join/multiple_keys.rs diff --git a/crates/polars-arrow/src/compute/utils.rs b/crates/polars-arrow/src/compute/utils.rs index 3e72d00f55d9..3198abd444ca 100644 --- a/crates/polars-arrow/src/compute/utils.rs +++ b/crates/polars-arrow/src/compute/utils.rs @@ -30,6 +30,7 @@ pub fn combine_validities_and(opt_l: Option<&Bitmap>, opt_r: Option<&Bitmap>) -> (None, None) => None, } } + pub fn combine_validities_or(opt_l: Option<&Bitmap>, opt_r: Option<&Bitmap>) -> Option { match (opt_l, opt_r) { (Some(l), Some(r)) => Some(l.bitor(r)), diff --git a/crates/polars-core/src/chunked_array/binary.rs b/crates/polars-core/src/chunked_array/binary.rs new file mode 100644 index 000000000000..bdbe0e18cbd6 --- /dev/null +++ b/crates/polars-core/src/chunked_array/binary.rs @@ -0,0 +1,72 @@ +use ahash::RandomState; +use polars_utils::hashing::BytesHash; +use rayon::prelude::*; + +use crate::hashing::get_null_hash_value; +use crate::prelude::*; +use crate::utils::{_set_partition_size, _split_offsets}; +use crate::POOL; + +#[inline] +fn fill_bytes_hashes<'a, T>(ca: &'a ChunkedArray, null_h: u64, hb: RandomState) -> Vec +where + T: PolarsDataType, + <::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>, +{ + let mut byte_hashes = Vec::with_capacity(ca.len()); + for arr in ca.downcast_iter() { + for opt_b in arr.iter() { + let opt_b = opt_b.as_ref().map(|v| v.as_ref()); + // SAFETY: + // the underlying data is tied to self + let opt_b = unsafe { std::mem::transmute::, Option<&'a [u8]>>(opt_b) }; + let hash = match opt_b { + Some(s) => hb.hash_one(s), + None => null_h, + }; + byte_hashes.push(BytesHash::new(opt_b, hash)) + } + } + byte_hashes +} + +impl ChunkedArray +where + T: PolarsDataType, + for<'a> ::ValueT<'a>: AsRef<[u8]>, +{ + #[allow(clippy::needless_lifetimes)] + pub fn to_bytes_hashes<'a>( + &'a self, + multithreaded: bool, + hb: RandomState, + ) -> Vec>> { + let null_h = get_null_hash_value(&hb); + + if multithreaded { + let n_partitions = _set_partition_size(); + + let split = _split_offsets(self.len(), n_partitions); + + POOL.install(|| { + split + .into_par_iter() + .map(|(offset, len)| { + let ca = self.slice(offset as i64, len); + let byte_hashes = fill_bytes_hashes(&ca, null_h, hb.clone()); + + // SAFETY: + // the underlying data is tied to self + unsafe { + std::mem::transmute::>, Vec>>( + byte_hashes, + ) + } + }) + .collect::>() + }) + } else { + vec![fill_bytes_hashes(self, null_h, hb.clone())] + } + } +} diff --git a/crates/polars-core/src/chunked_array/float.rs b/crates/polars-core/src/chunked_array/float.rs index 54615f2a407f..e2c7d726774d 100644 --- a/crates/polars-core/src/chunked_array/float.rs +++ b/crates/polars-core/src/chunked_array/float.rs @@ -1,6 +1,7 @@ use arrow::legacy::kernels::float::*; use arrow::legacy::kernels::set::set_at_nulls; use num_traits::Float; +use polars_utils::total_ord::{canonical_f32, canonical_f64}; use crate::prelude::*; @@ -31,3 +32,31 @@ where ChunkedArray::from_chunk_iter(self.name(), chunks) } } + +pub trait Canonical { + fn canonical(self) -> Self; +} + +impl Canonical for f32 { + #[inline] + fn canonical(self) -> Self { + canonical_f32(self) + } +} + +impl Canonical for f64 { + #[inline] + fn canonical(self) -> Self { + canonical_f64(self) + } +} + +impl ChunkedArray +where + T: PolarsFloatType, + T::Native: Float + Canonical, +{ + pub fn to_canonical(&self) -> Self { + self.apply_values_generic(|v| v.canonical()) + } +} diff --git a/crates/polars-core/src/chunked_array/mod.rs b/crates/polars-core/src/chunked_array/mod.rs index 7394e87210e8..30515a6c11e1 100644 --- a/crates/polars-core/src/chunked_array/mod.rs +++ b/crates/polars-core/src/chunked_array/mod.rs @@ -24,6 +24,7 @@ pub(crate) mod ndarray; #[cfg(feature = "dtype-array")] pub(crate) mod array; +mod binary; mod bitwise; #[cfg(feature = "object")] mod drop; diff --git a/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs b/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs index 35e2d57decf3..ed972bdf48ec 100644 --- a/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs +++ b/crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs @@ -1,3 +1,4 @@ +use arrow::compute::utils::combine_validities_and; use compare_inner::NullOrderCmp; use polars_row::{convert_columns, EncodingField, RowsEncoded}; use polars_utils::iter::EnumerateIdxTrait; @@ -87,7 +88,26 @@ pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult { Ok(out) } -pub(crate) fn encode_rows_vertical_par_unordered( +pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult { + let n_threads = POOL.current_num_threads(); + let len = by[0].len(); + let splits = _split_offsets(len, n_threads); + + let chunks = splits.into_par_iter().map(|(offset, len)| { + let sliced = by + .iter() + .map(|s| s.slice(offset as i64, len)) + .collect::>(); + let rows = _get_rows_encoded_unordered(&sliced)?; + Ok(rows.into_array()) + }); + let chunks = POOL.install(|| chunks.collect::>>()); + + Ok(BinaryOffsetChunked::from_chunk_iter("", chunks?)) +} + +// Almost the same but broadcast nulls to the row-encoded array. +pub fn encode_rows_vertical_par_unordered_broadcast_nulls( by: &[Series], ) -> PolarsResult { let n_threads = POOL.current_num_threads(); @@ -100,7 +120,19 @@ pub(crate) fn encode_rows_vertical_par_unordered( .map(|s| s.slice(offset as i64, len)) .collect::>(); let rows = _get_rows_encoded_unordered(&sliced)?; - Ok(rows.into_array()) + + let validity = sliced + .iter() + .flat_map(|s| { + let s = s.rechunk(); + #[allow(clippy::unnecessary_to_owned)] + s.chunks() + .to_vec() + .into_iter() + .map(|arr| arr.validity().cloned()) + }) + .fold(None, |l, r| combine_validities_and(l.as_ref(), r.as_ref())); + Ok(rows.into_array().with_validity_typed(validity)) }); let chunks = POOL.install(|| chunks.collect::>>()); diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index 418471abc388..0114478cfd5a 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -124,7 +124,6 @@ where // have the code duplication pub(crate) fn group_by_threaded_slice( keys: Vec, - n_partitions: usize, sorted: bool, ) -> GroupsProxy where @@ -132,6 +131,7 @@ where ::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash, IntoSlice: AsRef<[T]> + Send + Sync, { + let n_partitions = keys.len(); let init_size = get_init_size(); // We will create a hashtable in every thread. diff --git a/crates/polars-core/src/frame/group_by/into_groups.rs b/crates/polars-core/src/frame/group_by/into_groups.rs index 4c074c3b9cff..fb19d9578c8f 100644 --- a/crates/polars-core/src/frame/group_by/into_groups.rs +++ b/crates/polars-core/src/frame/group_by/into_groups.rs @@ -4,7 +4,6 @@ use polars_utils::total_ord::{ToTotalOrd, TotalHash}; use super::*; use crate::config::verbose; use crate::prelude::sort::arg_sort_multiple::_get_rows_encoded_ca_unordered; -use crate::utils::_split_offsets; use crate::utils::flatten::flatten_par; /// Used to create the tuples for a group_by operation. @@ -37,7 +36,7 @@ where .downcast_iter() .map(|arr| arr.values().as_slice()) .collect::>(); - group_by_threaded_slice(keys, n_partitions, sorted) + group_by_threaded_slice(keys, sorted) } else { let keys = ca .downcast_iter() @@ -256,108 +255,33 @@ impl IntoGroupsProxy for StringChunked { } } -fn fill_bytes_hashes(ca: &BinaryChunked, null_h: u64, hb: RandomState) -> Vec { - let mut byte_hashes = Vec::with_capacity(ca.len()); - for arr in ca.downcast_iter() { - for opt_b in arr { - let hash = match opt_b { - Some(s) => hb.hash_one(s), - None => null_h, - }; - byte_hashes.push(BytesHash::new(opt_b, hash)) - } - } - byte_hashes -} - impl IntoGroupsProxy for BinaryChunked { #[allow(clippy::needless_lifetimes)] fn group_tuples<'a>(&'a self, multithreaded: bool, sorted: bool) -> PolarsResult { - let hb = RandomState::default(); - let null_h = get_null_hash_value(&hb); + let bh = self.to_bytes_hashes(multithreaded, Default::default()); let out = if multithreaded { - let n_partitions = _set_partition_size(); - - let split = _split_offsets(self.len(), n_partitions); - - let byte_hashes = POOL.install(|| { - split - .into_par_iter() - .map(|(offset, len)| { - let ca = self.slice(offset as i64, len); - let byte_hashes = fill_bytes_hashes(&ca, null_h, hb.clone()); - - // SAFETY: - // the underlying data is tied to self - unsafe { - std::mem::transmute::>, Vec>>( - byte_hashes, - ) - } - }) - .collect::>() - }); - group_by_threaded_slice(byte_hashes, n_partitions, sorted) + // Take slices so that the vecs are not cloned. + let bh = bh.iter().map(|v| v.as_slice()).collect::>(); + group_by_threaded_slice(bh, sorted) } else { - let byte_hashes = fill_bytes_hashes(self, null_h, hb.clone()); - group_by(byte_hashes.iter(), sorted) + group_by(bh[0].iter(), sorted) }; Ok(out) } } -fn fill_bytes_offset_hashes( - ca: &BinaryOffsetChunked, - null_h: u64, - hb: RandomState, -) -> Vec { - let mut byte_hashes = Vec::with_capacity(ca.len()); - for arr in ca.downcast_iter() { - for opt_b in arr { - let hash = match opt_b { - Some(s) => hb.hash_one(s), - None => null_h, - }; - byte_hashes.push(BytesHash::new(opt_b, hash)) - } - } - byte_hashes -} - impl IntoGroupsProxy for BinaryOffsetChunked { #[allow(clippy::needless_lifetimes)] fn group_tuples<'a>(&'a self, multithreaded: bool, sorted: bool) -> PolarsResult { - let hb = RandomState::default(); - let null_h = get_null_hash_value(&hb); + let bh = self.to_bytes_hashes(multithreaded, Default::default()); let out = if multithreaded { - let n_partitions = _set_partition_size(); - - let split = _split_offsets(self.len(), n_partitions); - - let byte_hashes = POOL.install(|| { - split - .into_par_iter() - .map(|(offset, len)| { - let ca = self.slice(offset as i64, len); - let byte_hashes = fill_bytes_offset_hashes(&ca, null_h, hb.clone()); - - // SAFETY: - // the underlying data is tied to self - unsafe { - std::mem::transmute::>, Vec>>( - byte_hashes, - ) - } - }) - .collect::>() - }); - let byte_hashes = byte_hashes.iter().collect::>(); - group_by_threaded_slice(byte_hashes, n_partitions, sorted) + // Take slices so that the vecs are not cloned. + let bh = bh.iter().map(|v| v.as_slice()).collect::>(); + group_by_threaded_slice(bh, sorted) } else { - let byte_hashes = fill_bytes_offset_hashes(self, null_h, hb.clone()); - group_by(byte_hashes.iter(), sorted) + group_by(bh[0].iter(), sorted) }; Ok(out) } diff --git a/crates/polars-core/src/frame/group_by/mod.rs b/crates/polars-core/src/frame/group_by/mod.rs index 014ae2c8c28d..b688e8a54656 100644 --- a/crates/polars-core/src/frame/group_by/mod.rs +++ b/crates/polars-core/src/frame/group_by/mod.rs @@ -1,13 +1,11 @@ use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; -use ahash::RandomState; use num_traits::NumCast; -use polars_utils::hashing::{BytesHash, DirtyHash}; +use polars_utils::hashing::DirtyHash; use rayon::prelude::*; use self::hashing::*; -use crate::hashing::get_null_hash_value; use crate::prelude::*; use crate::utils::{_set_partition_size, accumulate_dataframes_vertical}; use crate::POOL; diff --git a/crates/polars-core/src/utils/series.rs b/crates/polars-core/src/utils/series.rs index 9db543263f83..feeb20ed763d 100644 --- a/crates/polars-core/src/utils/series.rs +++ b/crates/polars-core/src/utils/series.rs @@ -2,21 +2,6 @@ use crate::prelude::*; use crate::series::unstable::UnstableSeries; use crate::series::IsSorted; -/// Transform to physical type and coerce similar sized integer to a bit representation -/// to reduce compiler bloat -pub fn _to_physical_and_bit_repr(s: &[Series]) -> Vec { - s.iter() - .map(|s| { - let physical = s.to_physical_repr(); - match physical.dtype() { - DataType::Int64 => physical.bit_repr_large().into_series(), - DataType::Int32 => physical.bit_repr_small().into_series(), - _ => physical.into_owned(), - } - }) - .collect() -} - /// A utility that allocates an [`UnstableSeries`]. The applied function can then use that /// series container to save heap allocations and swap arrow arrays. pub fn with_unstable_series(dtype: &DataType, f: F) -> T diff --git a/crates/polars-lazy/src/physical_plan/expressions/window.rs b/crates/polars-lazy/src/physical_plan/expressions/window.rs index e9f2429562c5..920016a70ba0 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/window.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/window.rs @@ -557,17 +557,16 @@ impl PhysicalExpr for WindowExpr { if group_by_columns.len() == 1 { // group key from right column let right = &keys[0]; - group_by_columns[0] - .hash_join_left(right, JoinValidation::ManyToMany, true) - .unwrap() - .1 + PolarsResult::Ok( + group_by_columns[0] + .hash_join_left(right, JoinValidation::ManyToMany, true) + .unwrap() + .1, + ) } else { let df_right = unsafe { DataFrame::new_no_checks(keys) }; let df_left = unsafe { DataFrame::new_no_checks(group_by_columns) }; - private_left_join_multiple_keys( - &df_left, &df_right, None, None, true, - ) - .1 + Ok(private_left_join_multiple_keys(&df_left, &df_right, true)?.1) } }; @@ -580,10 +579,10 @@ impl PhysicalExpr for WindowExpr { if let Some(opt_join_tuples) = jt_map.get_mut(&cache_key) { std::mem::replace(opt_join_tuples, default_join_ids()) } else { - get_join_tuples() + get_join_tuples()? } } else { - get_join_tuples() + get_join_tuples()? }; let mut out = materialize_column(&join_opt_ids, &out_column); diff --git a/crates/polars-ops/src/frame/join/asof/groups.rs b/crates/polars-ops/src/frame/join/asof/groups.rs index 3fae9258c0c7..c2c5c6e96235 100644 --- a/crates/polars-ops/src/frame/join/asof/groups.rs +++ b/crates/polars-ops/src/frame/join/asof/groups.rs @@ -1,20 +1,104 @@ use std::hash::Hash; use ahash::RandomState; +use hashbrown::HashMap; use num_traits::Zero; -use polars_core::hashing::{_df_rows_to_hashes_threaded_vertical, _HASHMAP_INIT_SIZE}; +use polars_core::hashing::{ + IdxHash, _df_rows_to_hashes_threaded_vertical, populate_multiple_key_hashmap, + _HASHMAP_INIT_SIZE, +}; +use polars_core::prelude::*; use polars_core::utils::flatten::flatten_nullable; -use polars_core::utils::{split_ca, split_df}; -use polars_core::{with_match_physical_float_polars_type, POOL}; +use polars_core::utils::{_set_partition_size, split_ca, split_df}; +use polars_core::{with_match_physical_float_polars_type, IdBuildHasher, POOL}; use polars_utils::abs_diff::AbsDiff; use polars_utils::hashing::{hash_to_partition, DirtyHash}; +use polars_utils::idx_vec::IdxVec; use polars_utils::nulls::IsNull; use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash}; +use polars_utils::unitvec; use rayon::prelude::*; use smartstring::alias::String as SmartString; use super::*; +/// Compare the rows of two [`DataFrame`]s +pub(crate) unsafe fn compare_df_rows2( + left: &DataFrame, + right: &DataFrame, + left_idx: usize, + right_idx: usize, + join_nulls: bool, +) -> bool { + for (l, r) in left.get_columns().iter().zip(right.get_columns()) { + let l = l.get_unchecked(left_idx); + let r = r.get_unchecked(right_idx); + if !l.eq_missing(&r, join_nulls) { + return false; + } + } + true +} + +pub(crate) fn create_probe_table( + hashes: &[UInt64Chunked], + keys: &DataFrame, +) -> Vec> { + let n_partitions = _set_partition_size(); + + // We will create a hashtable in every thread. + // We use the hash to partition the keys to the matching hashtable. + // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. + POOL.install(|| { + (0..n_partitions) + .into_par_iter() + .map(|part_no| { + let mut hash_tbl: HashMap = + HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default()); + + let mut offset = 0; + for hashes in hashes { + for hashes in hashes.data_views() { + let len = hashes.len(); + let mut idx = 0; + hashes.iter().for_each(|h| { + // partition hashes by thread no. + // So only a part of the hashes go to this hashmap + if part_no == hash_to_partition(*h, n_partitions) { + let idx = idx + offset; + populate_multiple_key_hashmap( + &mut hash_tbl, + idx, + *h, + keys, + || unitvec![idx], + |v| v.push(idx), + ) + } + idx += 1; + }); + + offset += len as IdxSize; + } + } + hash_tbl + }) + .collect() + }) +} + +pub(crate) fn get_offsets(probe_hashes: &[UInt64Chunked]) -> Vec { + probe_hashes + .iter() + .map(|ph| ph.len()) + .scan(0, |state, val| { + let out = *state; + *state += val; + Some(out) + }) + .collect() +} + fn compute_len_offsets>(iter: I) -> Vec { let mut cumlen = 0; iter.into_iter() @@ -241,9 +325,9 @@ where let (probe_hashes, _) = _df_rows_to_hashes_threaded_vertical(&split_by_left, Some(random_state)).unwrap(); - let hash_tbls = mk::create_probe_table(&build_hashes, by_right); + let hash_tbls = create_probe_table(&build_hashes, by_right); drop(build_hashes); // Early drop to reduce memory pressure. - let offsets = mk::get_offsets(&probe_hashes); + let offsets = get_offsets(&probe_hashes); let n_tables = hash_tbls.len(); // Now we probe the right hand side for each left hand side. @@ -273,13 +357,7 @@ where let idx_right = idx_hash.idx; // SAFETY: indices in a join operation are always in bounds. unsafe { - mk::compare_df_rows2( - by_left, - by_right, - idx_left, - idx_right as usize, - false, - ) + compare_df_rows2(by_left, by_right, idx_left, idx_right as usize, false) } }); let Some((_, right_grp_idxs)) = entry else { diff --git a/crates/polars-ops/src/frame/join/asof/mod.rs b/crates/polars-ops/src/frame/join/asof/mod.rs index 7fd6c0a048d1..ed4f3c2b6db7 100644 --- a/crates/polars-ops/src/frame/join/asof/mod.rs +++ b/crates/polars-ops/src/frame/join/asof/mod.rs @@ -12,7 +12,7 @@ use smartstring::alias::String as SmartString; #[cfg(feature = "dtype-categorical")] use super::_check_categorical_src; -use super::{_finish_join, build_tables, multiple_keys as mk, prepare_bytes}; +use super::{_finish_join, build_tables, prepare_bytes}; use crate::frame::IntoDf; trait AsofJoinState: Default { diff --git a/crates/polars-ops/src/frame/join/hash_join/mod.rs b/crates/polars-ops/src/frame/join/hash_join/mod.rs index f07667130cc5..77baf7202cb7 100644 --- a/crates/polars-ops/src/frame/join/hash_join/mod.rs +++ b/crates/polars-ops/src/frame/join/hash_join/mod.rs @@ -1,4 +1,3 @@ -pub(super) mod multiple_keys; pub(super) mod single_keys; mod single_keys_dispatch; mod single_keys_inner; @@ -8,8 +7,6 @@ mod single_keys_outer; mod single_keys_semi_anti; pub(super) mod sort_merge; use arrow::array::ArrayRef; -pub use multiple_keys::private_left_join_multiple_keys; -pub(super) use multiple_keys::*; use polars_core::utils::{_set_partition_size, split_ca}; use polars_core::POOL; use polars_utils::index::ChunkId; @@ -22,7 +19,7 @@ use single_keys_left::*; use single_keys_outer::*; #[cfg(feature = "semi_anti_join")] use single_keys_semi_anti::*; -pub use sort_merge::*; +pub(crate) use sort_merge::*; pub use super::*; #[cfg(feature = "chunked_ids")] @@ -172,6 +169,7 @@ pub trait JoinDispatch: IntoDf { s_right: &Series, args: JoinArgs, verbose: bool, + drop_names: Option<&[&str]>, ) -> PolarsResult { let df_self = self.to_df(); #[cfg(feature = "dtype-categorical")] @@ -202,7 +200,12 @@ pub trait JoinDispatch: IntoDf { } let ids = sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?; - left._finish_left_join(ids, &right.drop(s_right.name()).unwrap(), args) + let right = if let Some(drop_names) = drop_names { + right.drop_many(drop_names) + } else { + right.drop(s_right.name()).unwrap() + }; + left._finish_left_join(ids, &right, args) } #[cfg(feature = "semi_anti_join")] diff --git a/crates/polars-ops/src/frame/join/hash_join/multiple_keys.rs b/crates/polars-ops/src/frame/join/hash_join/multiple_keys.rs deleted file mode 100644 index 119973c9671e..000000000000 --- a/crates/polars-ops/src/frame/join/hash_join/multiple_keys.rs +++ /dev/null @@ -1,625 +0,0 @@ -use arrow::array::{MutablePrimitiveArray, PrimitiveArray}; -use hashbrown::HashMap; -use polars_core::hashing::{populate_multiple_key_hashmap, IdBuildHasher, IdxHash}; -use polars_core::utils::split_df; -use polars_utils::hashing::hash_to_partition; -use polars_utils::idx_vec::IdxVec; -use polars_utils::unitvec; - -use super::*; - -/// Compare the rows of two [`DataFrame`]s -pub(crate) unsafe fn compare_df_rows2( - left: &DataFrame, - right: &DataFrame, - left_idx: usize, - right_idx: usize, - join_nulls: bool, -) -> bool { - for (l, r) in left.get_columns().iter().zip(right.get_columns()) { - let l = l.get_unchecked(left_idx); - let r = r.get_unchecked(right_idx); - if !l.eq_missing(&r, join_nulls) { - return false; - } - } - true -} - -pub(crate) fn create_probe_table( - hashes: &[UInt64Chunked], - keys: &DataFrame, -) -> Vec> { - let n_partitions = _set_partition_size(); - - // We will create a hashtable in every thread. - // We use the hash to partition the keys to the matching hashtable. - // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. - POOL.install(|| { - (0..n_partitions) - .into_par_iter() - .map(|part_no| { - let mut hash_tbl: HashMap = - HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default()); - - let mut offset = 0; - for hashes in hashes { - for hashes in hashes.data_views() { - let len = hashes.len(); - let mut idx = 0; - hashes.iter().for_each(|h| { - // partition hashes by thread no. - // So only a part of the hashes go to this hashmap - if part_no == hash_to_partition(*h, n_partitions) { - let idx = idx + offset; - populate_multiple_key_hashmap( - &mut hash_tbl, - idx, - *h, - keys, - || unitvec![idx], - |v| v.push(idx), - ) - } - idx += 1; - }); - - offset += len as IdxSize; - } - } - hash_tbl - }) - .collect() - }) -} - -fn create_build_table_outer( - hashes: &[UInt64Chunked], - keys: &DataFrame, -) -> Vec> { - // Outer join equivalent of create_build_table() adds a bool in the hashmap values for tracking - // whether a value in the hash table has already been matched to a value in the probe hashes. - let n_partitions = _set_partition_size(); - - // We will create a hashtable in every thread. - // We use the hash to partition the keys to the matching hashtable. - // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. - let par_iter = (0..n_partitions).into_par_iter().map(|part_no| { - let mut hash_tbl: HashMap = - HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default()); - - let mut offset = 0; - for hashes in hashes { - for hashes in hashes.data_views() { - let len = hashes.len(); - let mut idx = 0; - hashes.iter().for_each(|h| { - // partition hashes by thread no. - // So only a part of the hashes go to this hashmap - if part_no == hash_to_partition(*h, n_partitions) { - let idx = idx + offset; - populate_multiple_key_hashmap( - &mut hash_tbl, - idx, - *h, - keys, - || (false, unitvec![idx]), - |v| v.1.push(idx), - ) - } - idx += 1; - }); - - offset += len as IdxSize; - } - } - hash_tbl - }); - - POOL.install(|| par_iter.collect()) -} - -/// Probe the build table and add tuples to the results (inner join) -#[allow(clippy::too_many_arguments)] -fn probe_inner( - probe_hashes: &UInt64Chunked, - hash_tbls: &[HashMap], - results: &mut Vec<(IdxSize, IdxSize)>, - local_offset: usize, - n_tables: usize, - a: &DataFrame, - b: &DataFrame, - swap_fn: F, - join_nulls: bool, -) where - F: Fn(IdxSize, IdxSize) -> (IdxSize, IdxSize), -{ - let mut idx_a = local_offset as IdxSize; - for probe_hashes in probe_hashes.data_views() { - for &h in probe_hashes { - // probe table that contains the hashed value - let current_probe_table = - unsafe { hash_tbls.get_unchecked(hash_to_partition(h, n_tables)) }; - - let entry = current_probe_table.raw_entry().from_hash(h, |idx_hash| { - let idx_b = idx_hash.idx; - // SAFETY: - // indices in a join operation are always in bounds. - unsafe { compare_df_rows2(a, b, idx_a as usize, idx_b as usize, join_nulls) } - }); - - if let Some((_, indexes_b)) = entry { - let tuples = indexes_b.iter().map(|&idx_b| swap_fn(idx_a, idx_b)); - results.extend(tuples); - } - idx_a += 1; - } - } -} - -pub(crate) fn get_offsets(probe_hashes: &[UInt64Chunked]) -> Vec { - probe_hashes - .iter() - .map(|ph| ph.len()) - .scan(0, |state, val| { - let out = *state; - *state += val; - Some(out) - }) - .collect() -} - -pub fn _inner_join_multiple_keys( - a: &mut DataFrame, - b: &mut DataFrame, - swap: bool, - join_nulls: bool, -) -> (Vec, Vec) { - // we assume that the b DataFrame is the shorter relation. - // b will be used for the build phase. - - let n_threads = POOL.current_num_threads(); - let dfs_a = split_df(a, n_threads).unwrap(); - let dfs_b = split_df(b, n_threads).unwrap(); - - let (build_hashes, random_state) = _df_rows_to_hashes_threaded_vertical(&dfs_b, None).unwrap(); - let (probe_hashes, _) = - _df_rows_to_hashes_threaded_vertical(&dfs_a, Some(random_state)).unwrap(); - - let hash_tbls = create_probe_table(&build_hashes, b); - // early drop to reduce memory pressure - drop(build_hashes); - - let n_tables = hash_tbls.len(); - let offsets = get_offsets(&probe_hashes); - // next we probe the other relation - // code duplication is because we want to only do the swap check once - POOL.install(|| { - probe_hashes - .into_par_iter() - .zip(offsets) - .flat_map(|(probe_hashes, offset)| { - // local reference - let hash_tbls = &hash_tbls; - let mut results = - Vec::with_capacity(probe_hashes.len() / POOL.current_num_threads()); - let local_offset = offset; - // code duplication is to hoist swap out of the inner loop. - if swap { - probe_inner( - &probe_hashes, - hash_tbls, - &mut results, - local_offset, - n_tables, - a, - b, - |idx_a, idx_b| (idx_b, idx_a), - join_nulls, - ) - } else { - probe_inner( - &probe_hashes, - hash_tbls, - &mut results, - local_offset, - n_tables, - a, - b, - |idx_a, idx_b| (idx_a, idx_b), - join_nulls, - ) - } - - results - }) - .unzip() - }) -} - -pub fn private_left_join_multiple_keys( - a: &DataFrame, - b: &DataFrame, - // map the global indices to [chunk_idx, array_idx] - // only needed if we have non contiguous memory - chunk_mapping_left: Option<&[ChunkId]>, - chunk_mapping_right: Option<&[ChunkId]>, - join_nulls: bool, -) -> LeftJoinIds { - let mut a = unsafe { DataFrame::new_no_checks(_to_physical_and_bit_repr(a.get_columns())) }; - let mut b = unsafe { DataFrame::new_no_checks(_to_physical_and_bit_repr(b.get_columns())) }; - _left_join_multiple_keys( - &mut a, - &mut b, - chunk_mapping_left, - chunk_mapping_right, - join_nulls, - ) -} - -pub fn _left_join_multiple_keys( - a: &mut DataFrame, - b: &mut DataFrame, - // map the global indices to [chunk_idx, array_idx] - // only needed if we have non contiguous memory - chunk_mapping_left: Option<&[ChunkId]>, - chunk_mapping_right: Option<&[ChunkId]>, - join_nulls: bool, -) -> LeftJoinIds { - // we should not join on logical types - debug_assert!(!a.iter().any(|s| s.dtype().is_logical())); - debug_assert!(!b.iter().any(|s| s.dtype().is_logical())); - - let n_threads = POOL.current_num_threads(); - let dfs_a = split_df(a, n_threads).unwrap(); - let dfs_b = split_df(b, n_threads).unwrap(); - - let (build_hashes, random_state) = _df_rows_to_hashes_threaded_vertical(&dfs_b, None).unwrap(); - let (probe_hashes, _) = - _df_rows_to_hashes_threaded_vertical(&dfs_a, Some(random_state)).unwrap(); - - let hash_tbls = create_probe_table(&build_hashes, b); - // early drop to reduce memory pressure - drop(build_hashes); - - let n_tables = hash_tbls.len(); - let offsets = get_offsets(&probe_hashes); - - // next we probe the other relation - // code duplication is because we want to only do the swap check once - let results = POOL.install(move || { - probe_hashes - .into_par_iter() - .zip(offsets) - .map(move |(probe_hashes, offset)| { - // local reference - let hash_tbls = &hash_tbls; - - let len = probe_hashes.len() / POOL.current_num_threads(); - let mut result_idx_left = Vec::with_capacity(len); - let mut result_idx_right = Vec::with_capacity(len); - let local_offset = offset; - - let mut idx_a = local_offset as IdxSize; - for probe_hashes in probe_hashes.data_views() { - for &h in probe_hashes { - // probe table that contains the hashed value - let current_probe_table = - unsafe { hash_tbls.get_unchecked(hash_to_partition(h, n_tables)) }; - - let entry = current_probe_table.raw_entry().from_hash(h, |idx_hash| { - let idx_b = idx_hash.idx; - // SAFETY: - // indices in a join operation are always in bounds. - unsafe { - compare_df_rows2(a, b, idx_a as usize, idx_b as usize, join_nulls) - } - }); - - match entry { - // left and right matches - Some((_, indexes_b)) => { - result_idx_left - .extend(std::iter::repeat(idx_a).take(indexes_b.len())); - let indexes_b = bytemuck::cast_slice(indexes_b); - result_idx_right.extend_from_slice(indexes_b); - }, - // only left values, right = null - None => { - result_idx_left.push(idx_a); - result_idx_right.push(NullableIdxSize::null()); - }, - } - idx_a += 1; - } - } - - finish_left_join_mappings( - result_idx_left, - result_idx_right, - chunk_mapping_left, - chunk_mapping_right, - ) - }) - .collect::>() - }); - flatten_left_join_ids(results) -} - -#[cfg(feature = "semi_anti_join")] -pub(crate) fn create_build_table_semi_anti( - hashes: &[UInt64Chunked], - keys: &DataFrame, -) -> Vec> { - let n_partitions = _set_partition_size(); - - // We will create a hashtable in every thread. - // We use the hash to partition the keys to the matching hashtable. - // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. - let par_iter = (0..n_partitions).into_par_iter().map(|part_no| { - let mut hash_tbl: HashMap = - HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default()); - - let mut offset = 0; - for hashes in hashes { - for hashes in hashes.data_views() { - let len = hashes.len(); - let mut idx = 0; - hashes.iter().for_each(|h| { - // partition hashes by thread no. - // So only a part of the hashes go to this hashmap - if part_no == hash_to_partition(*h, n_partitions) { - let idx = idx + offset; - populate_multiple_key_hashmap(&mut hash_tbl, idx, *h, keys, || (), |_| ()) - } - idx += 1; - }); - - offset += len as IdxSize; - } - } - hash_tbl - }); - - POOL.install(|| par_iter.collect()) -} - -#[cfg(feature = "semi_anti_join")] -pub(crate) fn semi_anti_join_multiple_keys_impl<'a>( - a: &'a mut DataFrame, - b: &'a mut DataFrame, - join_nulls: bool, -) -> impl ParallelIterator + 'a { - // we should not join on logical types - debug_assert!(!a.iter().any(|s| s.dtype().is_logical())); - debug_assert!(!b.iter().any(|s| s.dtype().is_logical())); - - let n_threads = POOL.current_num_threads(); - let dfs_a = split_df(a, n_threads).unwrap(); - let dfs_b = split_df(b, n_threads).unwrap(); - - let (build_hashes, random_state) = _df_rows_to_hashes_threaded_vertical(&dfs_b, None).unwrap(); - let (probe_hashes, _) = - _df_rows_to_hashes_threaded_vertical(&dfs_a, Some(random_state)).unwrap(); - - let hash_tbls = create_build_table_semi_anti(&build_hashes, b); - // early drop to reduce memory pressure - drop(build_hashes); - - let n_tables = hash_tbls.len(); - let offsets = get_offsets(&probe_hashes); - - // next we probe the other relation - // code duplication is because we want to only do the swap check once - probe_hashes - .into_par_iter() - .zip(offsets) - .flat_map(move |(probe_hashes, offset)| { - // local reference - let hash_tbls = &hash_tbls; - let mut results = Vec::with_capacity(probe_hashes.len() / POOL.current_num_threads()); - let local_offset = offset; - - let mut idx_a = local_offset as IdxSize; - for probe_hashes in probe_hashes.data_views() { - for &h in probe_hashes { - // probe table that contains the hashed value - let current_probe_table = - unsafe { hash_tbls.get_unchecked(hash_to_partition(h, n_tables)) }; - - let entry = current_probe_table.raw_entry().from_hash(h, |idx_hash| { - let idx_b = idx_hash.idx; - // SAFETY: - // indices in a join operation are always in bounds. - unsafe { - compare_df_rows2(a, b, idx_a as usize, idx_b as usize, join_nulls) - } - }); - - match entry { - // left and right matches - Some((_, _)) => results.push((idx_a, true)), - // only left values, right = null - None => results.push((idx_a, false)), - } - idx_a += 1; - } - } - - results - }) -} - -#[cfg(feature = "semi_anti_join")] -pub fn _left_anti_multiple_keys( - a: &mut DataFrame, - b: &mut DataFrame, - join_nulls: bool, -) -> Vec { - let par_iter = semi_anti_join_multiple_keys_impl(a, b, join_nulls) - .filter(|tpls| !tpls.1) - .map(|tpls| tpls.0); - POOL.install(|| par_iter.collect()) -} - -#[cfg(feature = "semi_anti_join")] -pub fn _left_semi_multiple_keys( - a: &mut DataFrame, - b: &mut DataFrame, - join_nulls: bool, -) -> Vec { - let par_iter = semi_anti_join_multiple_keys_impl(a, b, join_nulls) - .filter(|tpls| tpls.1) - .map(|tpls| tpls.0); - POOL.install(|| par_iter.collect()) -} - -/// Probe the build table and add tuples to the results (inner join) -#[allow(clippy::too_many_arguments)] -#[allow(clippy::type_complexity)] -fn probe_outer( - probe_hashes: &[UInt64Chunked], - hash_tbls: &mut [HashMap], - results: &mut ( - MutablePrimitiveArray, - MutablePrimitiveArray, - ), - n_tables: usize, - a: &DataFrame, - b: &DataFrame, - // Function that get index_a, index_b when there is a match and pushes to result - swap_fn_match: F, - // Function that get index_a when there is no match and pushes to result - swap_fn_no_match: G, - // Function that get index_b from the build table that did not match any in A and pushes to result - swap_fn_drain: H, - join_nulls: bool, -) where - // idx_a, idx_b -> ... - F: Fn(IdxSize, IdxSize) -> (Option, Option), - // idx_a -> ... - G: Fn(IdxSize) -> (Option, Option), - // idx_b -> ... - H: Fn(IdxSize) -> (Option, Option), -{ - let mut idx_a = 0; - - // vec - for probe_hashes in probe_hashes { - // ca - for probe_hashes in probe_hashes.data_views() { - // chunk slices - for &h in probe_hashes { - // probe table that contains the hashed value - let current_probe_table = - unsafe { hash_tbls.get_unchecked_mut(hash_to_partition(h, n_tables)) }; - - let entry = current_probe_table - .raw_entry_mut() - .from_hash(h, |idx_hash| { - let idx_b = idx_hash.idx; - // SAFETY: - // indices in a join operation are always in bounds. - unsafe { - compare_df_rows2(a, b, idx_a as usize, idx_b as usize, join_nulls) - } - }); - - match entry { - // match and remove - RawEntryMut::Occupied(mut occupied) => { - let (tracker, indexes_b) = occupied.get_mut(); - *tracker = true; - - for (l, r) in indexes_b.iter().map(|&idx_b| swap_fn_match(idx_a, idx_b)) { - results.0.push(l); - results.1.push(r); - } - }, - // no match - RawEntryMut::Vacant(_) => { - let (l, r) = swap_fn_no_match(idx_a); - results.0.push(l); - results.1.push(r); - }, - } - idx_a += 1; - } - } - } - - for hash_tbl in hash_tbls { - hash_tbl.iter().for_each(|(_k, (tracker, indexes_b))| { - // remaining unmatched joined values from the right table - if !*tracker { - for (l, r) in indexes_b.iter().map(|&idx_b| swap_fn_drain(idx_b)) { - results.0.push(l); - results.1.push(r); - } - } - }); - } -} - -pub fn _outer_join_multiple_keys( - a: &mut DataFrame, - b: &mut DataFrame, - swap: bool, - join_nulls: bool, -) -> (PrimitiveArray, PrimitiveArray) { - // we assume that the b DataFrame is the shorter relation. - // b will be used for the build phase. - - let size = a.height() + b.height(); - let mut results = ( - MutablePrimitiveArray::with_capacity(size), - MutablePrimitiveArray::with_capacity(size), - ); - - let n_threads = POOL.current_num_threads(); - let dfs_a = split_df(a, n_threads).unwrap(); - let dfs_b = split_df(b, n_threads).unwrap(); - - let (build_hashes, random_state) = _df_rows_to_hashes_threaded_vertical(&dfs_b, None).unwrap(); - let (probe_hashes, _) = - _df_rows_to_hashes_threaded_vertical(&dfs_a, Some(random_state)).unwrap(); - - let mut hash_tbls = create_build_table_outer(&build_hashes, b); - // early drop to reduce memory pressure - drop(build_hashes); - - let n_tables = hash_tbls.len(); - // probe the hash table. - // Note: indexes from b that are not matched will be None, Some(idx_b) - // Therefore we remove the matches and the remaining will be joined from the right - - // branch is because we want to only do the swap check once - if swap { - probe_outer( - &probe_hashes, - &mut hash_tbls, - &mut results, - n_tables, - a, - b, - |idx_a, idx_b| (Some(idx_b), Some(idx_a)), - |idx_a| (None, Some(idx_a)), - |idx_b| (Some(idx_b), None), - join_nulls, - ) - } else { - probe_outer( - &probe_hashes, - &mut hash_tbls, - &mut results, - n_tables, - a, - b, - |idx_a, idx_b| (Some(idx_a), Some(idx_b)), - |idx_a| (Some(idx_a), None), - |idx_b| (None, Some(idx_b)), - join_nulls, - ) - } - (results.0.into(), results.1.into()) -} diff --git a/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs b/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs index 9468ac483d3d..ebf75ceb985b 100644 --- a/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs +++ b/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs @@ -20,29 +20,44 @@ pub trait SeriesJoin: SeriesSealed + Sized { validate.validate_probe(&lhs, &rhs, false)?; use DataType::*; - if matches!(lhs.dtype(), String | Binary) { - let lhs = lhs.cast(&Binary).unwrap(); - let rhs = rhs.cast(&Binary).unwrap(); - let lhs = lhs.binary().unwrap(); - let rhs = rhs.binary().unwrap(); - let (lhs, rhs, _, _) = prepare_binary(lhs, rhs, false); - let lhs = lhs.iter().map(|v| v.as_slice()).collect::>(); - let rhs = rhs.iter().map(|v| v.as_slice()).collect::>(); - hash_join_tuples_left(lhs, rhs, None, None, validate, join_nulls) - } else if lhs.dtype().is_float() { - with_match_physical_float_polars_type!(lhs.dtype(), |$T| { - let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); - let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); - num_group_join_left(lhs, rhs, validate, join_nulls) - }) - } else if s_self.bit_repr_is_large() { - let lhs = lhs.bit_repr_large(); - let rhs = rhs.bit_repr_large(); - num_group_join_left(&lhs, &rhs, validate, join_nulls) - } else { - let lhs = lhs.bit_repr_small(); - let rhs = rhs.bit_repr_small(); - num_group_join_left(&lhs, &rhs, validate, join_nulls) + + match lhs.dtype() { + String | Binary => { + let lhs = lhs.cast(&Binary).unwrap(); + let rhs = rhs.cast(&Binary).unwrap(); + let lhs = lhs.binary().unwrap(); + let rhs = rhs.binary().unwrap(); + let (lhs, rhs, _, _) = prepare_binary::(lhs, rhs, false); + let lhs = lhs.iter().map(|v| v.as_slice()).collect::>(); + let rhs = rhs.iter().map(|v| v.as_slice()).collect::>(); + hash_join_tuples_left(lhs, rhs, None, None, validate, join_nulls) + }, + BinaryOffset => { + let lhs = lhs.binary_offset().unwrap(); + let rhs = rhs.binary_offset().unwrap(); + let (lhs, rhs, _, _) = prepare_binary::(lhs, rhs, false); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + hash_join_tuples_left(lhs, rhs, None, None, validate, join_nulls) + }, + _ => { + if lhs.dtype().is_float() { + with_match_physical_float_polars_type!(lhs.dtype(), |$T| { + let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); + let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); + num_group_join_left(lhs, rhs, validate, join_nulls) + }) + } else if s_self.bit_repr_is_large() { + let lhs = lhs.bit_repr_large(); + let rhs = rhs.bit_repr_large(); + num_group_join_left(&lhs, &rhs, validate, join_nulls) + } else { + let lhs = lhs.bit_repr_small(); + let rhs = rhs.bit_repr_small(); + num_group_join_left(&lhs, &rhs, validate, join_nulls) + } + }, } } @@ -52,33 +67,53 @@ pub trait SeriesJoin: SeriesSealed + Sized { let (lhs, rhs) = (s_self.to_physical_repr(), other.to_physical_repr()); use DataType::*; - if matches!(lhs.dtype(), String | Binary) { - let lhs = lhs.cast(&Binary).unwrap(); - let rhs = rhs.cast(&Binary).unwrap(); - let lhs = lhs.binary().unwrap(); - let rhs = rhs.binary().unwrap(); - let (lhs, rhs, _, _) = prepare_binary(lhs, rhs, false); - let lhs = lhs.iter().map(|v| v.as_slice()).collect::>(); - let rhs = rhs.iter().map(|v| v.as_slice()).collect::>(); - if anti { - hash_join_tuples_left_anti(lhs, rhs) - } else { - hash_join_tuples_left_semi(lhs, rhs) - } - } else if lhs.dtype().is_float() { - with_match_physical_float_polars_type!(lhs.dtype(), |$T| { - let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); - let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); - num_group_join_anti_semi(lhs, rhs, anti) - }) - } else if s_self.bit_repr_is_large() { - let lhs = lhs.bit_repr_large(); - let rhs = rhs.bit_repr_large(); - num_group_join_anti_semi(&lhs, &rhs, anti) - } else { - let lhs = lhs.bit_repr_small(); - let rhs = rhs.bit_repr_small(); - num_group_join_anti_semi(&lhs, &rhs, anti) + + match lhs.dtype() { + String | Binary => { + let lhs = lhs.cast(&Binary).unwrap(); + let rhs = rhs.cast(&Binary).unwrap(); + let lhs = lhs.binary().unwrap(); + let rhs = rhs.binary().unwrap(); + let (lhs, rhs, _, _) = prepare_binary::(lhs, rhs, false); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + if anti { + hash_join_tuples_left_anti(lhs, rhs) + } else { + hash_join_tuples_left_semi(lhs, rhs) + } + }, + BinaryOffset => { + let lhs = lhs.binary_offset().unwrap(); + let rhs = rhs.binary_offset().unwrap(); + let (lhs, rhs, _, _) = prepare_binary::(lhs, rhs, false); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + if anti { + hash_join_tuples_left_anti(lhs, rhs) + } else { + hash_join_tuples_left_semi(lhs, rhs) + } + }, + _ => { + if lhs.dtype().is_float() { + with_match_physical_float_polars_type!(lhs.dtype(), |$T| { + let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); + let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); + num_group_join_anti_semi(lhs, rhs, anti) + }) + } else if s_self.bit_repr_is_large() { + let lhs = lhs.bit_repr_large(); + let rhs = rhs.bit_repr_large(); + num_group_join_anti_semi(&lhs, &rhs, anti) + } else { + let lhs = lhs.bit_repr_small(); + let rhs = rhs.bit_repr_small(); + num_group_join_anti_semi(&lhs, &rhs, anti) + } + }, } } @@ -94,32 +129,50 @@ pub trait SeriesJoin: SeriesSealed + Sized { validate.validate_probe(&lhs, &rhs, true)?; use DataType::*; - if matches!(lhs.dtype(), String | Binary) { - let lhs = lhs.cast(&Binary).unwrap(); - let rhs = rhs.cast(&Binary).unwrap(); - let lhs = lhs.binary().unwrap(); - let rhs = rhs.binary().unwrap(); - let (lhs, rhs, swapped, _) = prepare_binary(lhs, rhs, true); - let lhs = lhs.iter().map(|v| v.as_slice()).collect::>(); - let rhs = rhs.iter().map(|v| v.as_slice()).collect::>(); - Ok(( - hash_join_tuples_inner(lhs, rhs, swapped, validate, join_nulls)?, - !swapped, - )) - } else if lhs.dtype().is_float() { - with_match_physical_float_polars_type!(lhs.dtype(), |$T| { - let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); - let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); - group_join_inner::<$T>(lhs, rhs, validate, join_nulls) - }) - } else if s_self.bit_repr_is_large() { - let lhs = s_self.bit_repr_large(); - let rhs = other.bit_repr_large(); - group_join_inner::(&lhs, &rhs, validate, join_nulls) - } else { - let lhs = s_self.bit_repr_small(); - let rhs = other.bit_repr_small(); - group_join_inner::(&lhs, &rhs, validate, join_nulls) + match lhs.dtype() { + String | Binary => { + let lhs = lhs.cast(&Binary).unwrap(); + let rhs = rhs.cast(&Binary).unwrap(); + let lhs = lhs.binary().unwrap(); + let rhs = rhs.binary().unwrap(); + let (lhs, rhs, swapped, _) = prepare_binary::(lhs, rhs, true); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + Ok(( + hash_join_tuples_inner(lhs, rhs, swapped, validate, join_nulls)?, + !swapped, + )) + }, + BinaryOffset => { + let lhs = lhs.binary_offset().unwrap(); + let rhs = rhs.binary_offset()?; + let (lhs, rhs, swapped, _) = prepare_binary::(lhs, rhs, true); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + Ok(( + hash_join_tuples_inner(lhs, rhs, swapped, validate, join_nulls)?, + !swapped, + )) + }, + _ => { + if lhs.dtype().is_float() { + with_match_physical_float_polars_type!(lhs.dtype(), |$T| { + let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); + let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); + group_join_inner::<$T>(lhs, rhs, validate, join_nulls) + }) + } else if s_self.bit_repr_is_large() { + let lhs = s_self.bit_repr_large(); + let rhs = other.bit_repr_large(); + group_join_inner::(&lhs, &rhs, validate, join_nulls) + } else { + let lhs = s_self.bit_repr_small(); + let rhs = other.bit_repr_small(); + group_join_inner::(&lhs, &rhs, validate, join_nulls) + } + }, } } @@ -134,29 +187,44 @@ pub trait SeriesJoin: SeriesSealed + Sized { validate.validate_probe(&lhs, &rhs, true)?; use DataType::*; - if matches!(lhs.dtype(), String | Binary) { - let lhs = lhs.cast(&Binary).unwrap(); - let rhs = rhs.cast(&Binary).unwrap(); - let lhs = lhs.binary().unwrap(); - let rhs = rhs.binary().unwrap(); - let (lhs, rhs, swapped, _) = prepare_binary(lhs, rhs, true); - let lhs = lhs.iter().collect::>(); - let rhs = rhs.iter().collect::>(); - hash_join_tuples_outer(lhs, rhs, swapped, validate, join_nulls) - } else if lhs.dtype().is_float() { - with_match_physical_float_polars_type!(lhs.dtype(), |$T| { - let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); - let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); - hash_join_outer(lhs, rhs, validate, join_nulls) - }) - } else if s_self.bit_repr_is_large() { - let lhs = s_self.bit_repr_large(); - let rhs = other.bit_repr_large(); - hash_join_outer(&lhs, &rhs, validate, join_nulls) - } else { - let lhs = s_self.bit_repr_small(); - let rhs = other.bit_repr_small(); - hash_join_outer(&lhs, &rhs, validate, join_nulls) + match lhs.dtype() { + String | Binary => { + let lhs = lhs.cast(&Binary).unwrap(); + let rhs = rhs.cast(&Binary).unwrap(); + let lhs = lhs.binary().unwrap(); + let rhs = rhs.binary().unwrap(); + let (lhs, rhs, swapped, _) = prepare_binary::(lhs, rhs, true); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + hash_join_tuples_outer(lhs, rhs, swapped, validate, join_nulls) + }, + BinaryOffset => { + let lhs = lhs.binary_offset().unwrap(); + let rhs = rhs.binary_offset()?; + let (lhs, rhs, swapped, _) = prepare_binary::(lhs, rhs, true); + // Take slices so that vecs are not copied + let lhs = lhs.iter().map(|k| k.as_slice()).collect::>(); + let rhs = rhs.iter().map(|k| k.as_slice()).collect::>(); + hash_join_tuples_outer(lhs, rhs, swapped, validate, join_nulls) + }, + _ => { + if lhs.dtype().is_float() { + with_match_physical_float_polars_type!(lhs.dtype(), |$T| { + let lhs: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); + let rhs: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); + hash_join_outer(lhs, rhs, validate, join_nulls) + }) + } else if s_self.bit_repr_is_large() { + let lhs = s_self.bit_repr_large(); + let rhs = other.bit_repr_large(); + hash_join_outer(&lhs, &rhs, validate, join_nulls) + } else { + let lhs = s_self.bit_repr_small(); + let rhs = other.bit_repr_small(); + hash_join_outer(&lhs, &rhs, validate, join_nulls) + } + }, } } } @@ -366,6 +434,7 @@ where } } +#[cfg(feature = "asof_join")] pub fn prepare_bytes<'a>( been_split: &'a [BinaryChunked], hb: &RandomState, @@ -385,9 +454,9 @@ pub fn prepare_bytes<'a>( }) } -fn prepare_binary<'a>( - ca: &'a BinaryChunked, - other: &'a BinaryChunked, +fn prepare_binary<'a, T>( + ca: &'a ChunkedArray, + other: &'a ChunkedArray, // In inner join and outer join, the shortest relation will be used to create a hash table. // In left join, always use the right side to create. build_shortest_table: bool, @@ -396,27 +465,21 @@ fn prepare_binary<'a>( Vec>>, bool, RandomState, -) { - let n_threads = POOL.current_num_threads(); - +) +where + T: PolarsDataType, + for<'b> ::ValueT<'b>: AsRef<[u8]>, +{ let (a, b, swapped) = if build_shortest_table { det_hash_prone_order!(ca, other) } else { (ca, other, false) }; - let hb = RandomState::default(); - let splitted_a = split_ca(a, n_threads).unwrap(); - let splitted_b = split_ca(b, n_threads).unwrap(); - let str_hashes_a = prepare_bytes(&splitted_a, &hb); - let str_hashes_b = prepare_bytes(&splitted_b, &hb); - - // SAFETY: - // Splitting a Ca keeps the same buffers, so the lifetime is still valid. - let str_hashes_a = unsafe { std::mem::transmute::<_, Vec>>>(str_hashes_a) }; - let str_hashes_b = unsafe { std::mem::transmute::<_, Vec>>>(str_hashes_b) }; + let bh_a = a.to_bytes_hashes(true, hb.clone()); + let bh_b = b.to_bytes_hashes(true, hb.clone()); - (str_hashes_a, str_hashes_b, swapped, hb) + (bh_a, bh_b, swapped, hb) } #[cfg(feature = "semi_anti_join")] diff --git a/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs b/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs index c19d6f5f9de9..fce2f2bf6cf0 100644 --- a/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs +++ b/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs @@ -172,7 +172,7 @@ fn create_reverse_map_from_arg_sort(mut arg_sort: IdxCa) -> Vec { } #[cfg(not(feature = "performant"))] -pub fn _sort_or_hash_inner( +pub(crate) fn _sort_or_hash_inner( s_left: &Series, s_right: &Series, _verbose: bool, @@ -183,7 +183,7 @@ pub fn _sort_or_hash_inner( } #[cfg(feature = "performant")] -pub fn _sort_or_hash_inner( +pub(crate) fn _sort_or_hash_inner( s_left: &Series, s_right: &Series, verbose: bool, @@ -273,7 +273,7 @@ pub fn _sort_or_hash_inner( } #[cfg(not(feature = "performant"))] -pub(super) fn sort_or_hash_left( +pub(crate) fn sort_or_hash_left( s_left: &Series, s_right: &Series, _verbose: bool, @@ -284,7 +284,7 @@ pub(super) fn sort_or_hash_left( } #[cfg(feature = "performant")] -pub(super) fn sort_or_hash_left( +pub(crate) fn sort_or_hash_left( s_left: &Series, s_right: &Series, verbose: bool, diff --git a/crates/polars-ops/src/frame/join/mod.rs b/crates/polars-ops/src/frame/join/mod.rs index ccc4c72184bd..442d9bdd172f 100644 --- a/crates/polars-ops/src/frame/join/mod.rs +++ b/crates/polars-ops/src/frame/join/mod.rs @@ -30,17 +30,23 @@ pub use hash_join::*; use hashbrown::hash_map::{Entry, RawEntryMut}; #[cfg(feature = "merge_sorted")] pub use merge_sorted::_merge_sorted_dfs; -use polars_core::hashing::{_df_rows_to_hashes_threaded_vertical, _HASHMAP_INIT_SIZE}; +use polars_core::hashing::_HASHMAP_INIT_SIZE; +#[allow(unused_imports)] +use polars_core::prelude::sort::arg_sort_multiple::{ + encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls, +}; use polars_core::prelude::*; pub(super) use polars_core::series::IsSorted; +use polars_core::utils::slice_offsets; #[allow(unused_imports)] use polars_core::utils::slice_slice; -use polars_core::utils::{_to_physical_and_bit_repr, slice_offsets}; use polars_core::POOL; use polars_utils::hashing::BytesHash; use rayon::prelude::*; use super::IntoDf; +const LHS_NAME: &str = "POLARS_K_L"; +const RHS_NAME: &str = "POLARS_K_R"; pub trait DataFrameJoinOps: IntoDf { /// Generic join method. Can be used to join on multiple columns. @@ -106,7 +112,7 @@ pub trait DataFrameJoinOps: IntoDf { other: &DataFrame, mut selected_left: Vec, mut selected_right: Vec, - args: JoinArgs, + mut args: JoinArgs, _check_rechunk: bool, _verbose: bool, ) -> PolarsResult { @@ -201,10 +207,10 @@ pub trait DataFrameJoinOps: IntoDf { let s_right = &selected_right[0]; return match args.how { JoinType::Inner => { - left_df._inner_join_from_series(other, s_left, s_right, args, _verbose) + left_df._inner_join_from_series(other, s_left, s_right, args, _verbose, None) }, JoinType::Left => { - left_df._left_join_from_series(other, s_left, s_right, args, _verbose) + left_df._left_join_from_series(other, s_left, s_right, args, _verbose, None) }, JoinType::Outer { .. } => { left_df._outer_join_from_series(other, s_left, s_right, args) @@ -254,121 +260,66 @@ pub trait DataFrameJoinOps: IntoDf { }; } - fn remove_selected(df: &DataFrame, selected: &[Series]) -> DataFrame { - let mut new = None; - for s in selected { - new = match new { - None => Some(df.drop(s.name()).unwrap()), - Some(new) => Some(new.drop(s.name()).unwrap()), - } - } - new.unwrap() - } - - // Make sure that we don't have logical types. - // We don't overwrite the original selected as that might be used to create a column in the new df. - let selected_left_physical = _to_physical_and_bit_repr(&selected_left); - let selected_right_physical = _to_physical_and_bit_repr(&selected_right); + let lhs_keys = prepare_keys_multiple(&selected_left, args.join_nulls)? + .into_series() + .with_name(LHS_NAME); + let rhs_keys = prepare_keys_multiple(&selected_right, args.join_nulls)? + .into_series() + .with_name(RHS_NAME); + let names_right = selected_right.iter().map(|s| s.name()).collect::>(); // Multiple keys. match args.how { - JoinType::Inner => { - let left = unsafe { DataFrame::new_no_checks(selected_left_physical) }; - let right = unsafe { DataFrame::new_no_checks(selected_right_physical) }; - let (mut left, mut right, swap) = det_hash_prone_order!(left, right); - let (join_idx_left, join_idx_right) = - _inner_join_multiple_keys(&mut left, &mut right, swap, args.join_nulls); - let mut join_idx_left = &*join_idx_left; - let mut join_idx_right = &*join_idx_right; - - if let Some((offset, len)) = args.slice { - join_idx_left = slice_slice(join_idx_left, offset, len); - join_idx_right = slice_slice(join_idx_right, offset, len); - } - - let (df_left, df_right) = POOL.join( - // SAFETY: join indices are known to be in bounds - || unsafe { left_df._create_left_df_from_slice(join_idx_left, false, !swap) }, - || unsafe { - // remove join columns - remove_selected(other, &selected_right) - ._take_unchecked_slice(join_idx_right, true) - }, - ); - _finish_join(df_left, df_right, args.suffix.as_deref()) - }, - JoinType::Left => { - let mut left = unsafe { DataFrame::new_no_checks(selected_left_physical) }; - let mut right = unsafe { DataFrame::new_no_checks(selected_right_physical) }; - - if let Some((offset, len)) = args.slice { - left = left.slice(offset, len); - } - let ids = - _left_join_multiple_keys(&mut left, &mut right, None, None, args.join_nulls); - left_df._finish_left_join(ids, &remove_selected(other, &selected_right), args) + #[cfg(feature = "asof_join")] + JoinType::AsOf(_) => polars_bail!( + ComputeError: "asof join not supported for join on multiple keys" + ), + JoinType::Cross => { + unreachable!() }, - JoinType::Outer { .. } => { - let df_left = unsafe { DataFrame::new_no_checks(selected_left_physical) }; - let df_right = unsafe { DataFrame::new_no_checks(selected_right_physical) }; - - let (mut left, mut right, swap) = det_hash_prone_order!(df_left, df_right); - let (mut join_idx_l, mut join_idx_r) = - _outer_join_multiple_keys(&mut left, &mut right, swap, args.join_nulls); - - if let Some((offset, len)) = args.slice { - let (offset, len) = slice_offsets(offset, len, join_idx_l.len()); - join_idx_l.slice(offset, len); - join_idx_r.slice(offset, len); - } - let idx_ca_l = IdxCa::with_chunk("", join_idx_l); - let idx_ca_r = IdxCa::with_chunk("", join_idx_r); - - // Take the left and right dataframes by join tuples - let (df_left, df_right) = POOL.join( - || unsafe { left_df.take_unchecked(&idx_ca_l) }, - || unsafe { other.take_unchecked(&idx_ca_r) }, - ); - - let JoinType::Outer { coalesce } = args.how else { - unreachable!() - }; + JoinType::Outer { coalesce } => { let names_left = selected_left.iter().map(|s| s.name()).collect::>(); - let names_right = selected_right.iter().map(|s| s.name()).collect::>(); - let out = _finish_join(df_left, df_right, args.suffix.as_deref()); + args.how = JoinType::Outer { coalesce: false }; + let suffix = args.suffix.clone(); + let out = left_df._outer_join_from_series(other, &lhs_keys, &rhs_keys, args); + if coalesce { Ok(_coalesce_outer_join( out?, &names_left, &names_right, - args.suffix.as_deref(), + suffix.as_deref(), left_df, )) } else { out } }, - #[cfg(feature = "asof_join")] - JoinType::AsOf(_) => polars_bail!( - ComputeError: "asof join not supported for join on multiple keys" + JoinType::Inner => left_df._inner_join_from_series( + other, + &lhs_keys, + &rhs_keys, + args, + _verbose, + Some(&names_right), + ), + JoinType::Left => left_df._left_join_from_series( + other, + &lhs_keys, + &rhs_keys, + args, + _verbose, + Some(&names_right), ), #[cfg(feature = "semi_anti_join")] - JoinType::Anti | JoinType::Semi => { - let mut left = unsafe { DataFrame::new_no_checks(selected_left_physical) }; - let mut right = unsafe { DataFrame::new_no_checks(selected_right_physical) }; - - let idx = if matches!(args.how, JoinType::Anti) { - _left_anti_multiple_keys(&mut left, &mut right, args.join_nulls) - } else { - _left_semi_multiple_keys(&mut left, &mut right, args.join_nulls) - }; - // SAFETY: - // indices are in bounds - Ok(unsafe { left_df._finish_anti_semi_join(&idx, args.slice) }) - }, - JoinType::Cross => { - unreachable!() - }, + JoinType::Anti | JoinType::Semi => self._join_impl( + other, + vec![lhs_keys], + vec![rhs_keys], + args, + _check_rechunk, + _verbose, + ), } } @@ -469,11 +420,6 @@ pub trait DataFrameJoinOps: IntoDf { } trait DataFrameJoinOpsPrivate: IntoDf { - // hack for a macro - fn len(&self) -> usize { - self.to_df().height() - } - fn _inner_join_from_series( &self, other: &DataFrame, @@ -481,6 +427,7 @@ trait DataFrameJoinOpsPrivate: IntoDf { s_right: &Series, args: JoinArgs, verbose: bool, + drop_names: Option<&[&str]>, ) -> PolarsResult { let left_df = self.to_df(); #[cfg(feature = "dtype-categorical")] @@ -500,10 +447,12 @@ trait DataFrameJoinOpsPrivate: IntoDf { // SAFETY: join indices are known to be in bounds || unsafe { left_df._create_left_df_from_slice(join_tuples_left, false, sorted) }, || unsafe { - other - .drop(s_right.name()) - .unwrap() - ._take_unchecked_slice(join_tuples_right, true) + if let Some(drop_names) = drop_names { + other.drop_many(drop_names) + } else { + other.drop(s_right.name()).unwrap() + } + ._take_unchecked_slice(join_tuples_right, true) }, ); _finish_join(df_left, df_right, args.suffix.as_deref()) @@ -512,3 +461,32 @@ trait DataFrameJoinOpsPrivate: IntoDf { impl DataFrameJoinOps for DataFrame {} impl DataFrameJoinOpsPrivate for DataFrame {} + +fn prepare_keys_multiple(s: &[Series], join_nulls: bool) -> PolarsResult { + let keys = s + .iter() + .map(|s| { + let phys = s.to_physical_repr(); + match phys.dtype() { + DataType::Float32 => phys.f32().unwrap().to_canonical().into_series(), + DataType::Float64 => phys.f64().unwrap().to_canonical().into_series(), + _ => phys.into_owned(), + } + }) + .collect::>(); + + if join_nulls { + encode_rows_vertical_par_unordered(&keys) + } else { + encode_rows_vertical_par_unordered_broadcast_nulls(&keys) + } +} +pub fn private_left_join_multiple_keys( + a: &DataFrame, + b: &DataFrame, + join_nulls: bool, +) -> PolarsResult { + let a = prepare_keys_multiple(a.get_columns(), join_nulls)?.into_series(); + let b = prepare_keys_multiple(b.get_columns(), join_nulls)?.into_series(); + sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, join_nulls) +} diff --git a/crates/polars-utils/src/total_ord.rs b/crates/polars-utils/src/total_ord.rs index f5d7b22a95aa..80b21bec507a 100644 --- a/crates/polars-utils/src/total_ord.rs +++ b/crates/polars-utils/src/total_ord.rs @@ -8,6 +8,7 @@ use crate::nulls::IsNull; /// Converts an f32 into a canonical form, where -0 == 0 and all NaNs map to /// the same value. +#[inline] pub fn canonical_f32(x: f32) -> f32 { // -0.0 + 0.0 becomes 0.0. let convert_zero = x + 0.0; @@ -20,6 +21,7 @@ pub fn canonical_f32(x: f32) -> f32 { /// Converts an f64 into a canonical form, where -0 == 0 and all NaNs map to /// the same value. +#[inline] pub fn canonical_f64(x: f64) -> f64 { // -0.0 + 0.0 becomes 0.0. let convert_zero = x + 0.0;