Skip to content

Commit

Permalink
improve inner join performance (#3168)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 17, 2022
1 parent 0a14848 commit ff20af7
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 31 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/asof_join/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ where
let (build_hashes, random_state) = df_rows_to_hashes_threaded(&dfs_b, None);
let (probe_hashes, _) = df_rows_to_hashes_threaded(&dfs_a, Some(random_state));

let hash_tbls = mk::create_build_table(&build_hashes, b);
let hash_tbls = mk::create_probe_table(&build_hashes, b);
// early drop to reduce memory pressure
drop(build_hashes);

Expand Down
60 changes: 42 additions & 18 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,31 @@ impl DataFrame {
Ok(df_left)
}

fn create_left_df<B: Sync>(&self, join_tuples: &[(IdxSize, B)], left_join: bool) -> DataFrame {
/// # Safety
/// Join tuples must be in bounds
unsafe fn create_left_df_from_slice(
&self,
join_tuples: &[IdxSize],
left_join: bool,
) -> DataFrame {
if left_join && join_tuples.len() == self.height() {
self.clone()
} else {
unsafe {
self.take_iter_unchecked(join_tuples.iter().map(|(left, _right)| *left as usize))
}
self.take_unchecked_slice(join_tuples)
}
}

/// # Safety
/// Join tuples must be in bounds
unsafe fn create_left_df<B: Sync>(
&self,
join_tuples: &[(IdxSize, B)],
left_join: bool,
) -> DataFrame {
if left_join && join_tuples.len() == self.height() {
self.clone()
} else {
self.take_iter_unchecked(join_tuples.iter().map(|(left, _right)| *left as usize))
}
}

Expand Down Expand Up @@ -383,20 +401,21 @@ impl DataFrame {
let left = DataFrame::new_no_checks(selected_left_physical);
let right = DataFrame::new_no_checks(selected_right_physical);
let (left, right, swap) = det_hash_prone_order!(left, right);
let join_tuples = inner_join_multiple_keys(&left, &right, swap);
let mut join_tuples = &*join_tuples;
let (join_idx_left, join_idx_right) = inner_join_multiple_keys(&left, &right, swap);
let mut join_idx_left = &*join_idx_left;
let mut join_idx_right = &*join_idx_right;

if let Some((offset, len)) = slice {
join_tuples = slice_slice(join_tuples, offset, len);
join_idx_left = slice_slice(join_idx_left, offset, len);
join_idx_right = slice_slice(join_idx_right, offset, len);
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(join_tuples, false),
// safety: join indices are known to be in bounds
|| unsafe { self.create_left_df_from_slice(join_idx_left, false) },
|| unsafe {
// remove join columns
remove_selected(other, &selected_right).take_iter_unchecked(
join_tuples.iter().map(|(_left, right)| *right as usize),
)
remove_selected(other, &selected_right).take_unchecked_slice(join_idx_right)
},
);
self.finish_join(df_left, df_right, suffix)
Expand All @@ -412,7 +431,8 @@ impl DataFrame {
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(join_tuples, true),
// safety: join indices are known to be in bounds
|| unsafe { self.create_left_df(join_tuples, true) },
|| unsafe {
// remove join columns
remove_selected(other, &selected_right).take_opt_iter_unchecked(
Expand Down Expand Up @@ -572,20 +592,23 @@ impl DataFrame {
#[cfg(feature = "dtype-categorical")]
check_categorical_src(s_left.dtype(), s_right.dtype())?;

let join_tuples = s_left.hash_join_inner(s_right);
let mut join_tuples = &*join_tuples;
let (join_tuples_left, join_tuples_right) = s_left.hash_join_inner(s_right);
let mut join_tuples_left = &*join_tuples_left;
let mut join_tuples_right = &*join_tuples_right;

if let Some((offset, len)) = slice {
join_tuples = slice_slice(join_tuples, offset, len);
join_tuples_left = slice_slice(join_tuples_left, offset, len);
join_tuples_right = slice_slice(join_tuples_right, offset, len);
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(join_tuples, false),
// safety: join indices are known to be in bounds
|| unsafe { self.create_left_df_from_slice(join_tuples_left, false) },
|| unsafe {
other
.drop(s_right.name())
.unwrap()
.take_iter_unchecked(join_tuples.iter().map(|(_left, right)| *right as usize))
.take_unchecked_slice(join_tuples_right)
},
);
self.finish_join(df_left, df_right, suffix)
Expand Down Expand Up @@ -652,7 +675,8 @@ impl DataFrame {
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(opt_join_tuples, true),
// safety: join indices are known to be in bounds
|| unsafe { self.create_left_df(opt_join_tuples, true) },
|| unsafe {
other.drop(s_right.name()).unwrap().take_opt_iter_unchecked(
opt_join_tuples
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-core/src/frame/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) unsafe fn compare_df_rows2(
true
}

pub(crate) fn create_build_table(
pub(crate) fn create_probe_table(
hashes: &[UInt64Chunked],
keys: &DataFrame,
) -> Vec<HashMap<IdxHash, Vec<IdxSize>, IdBuildHasher>> {
Expand Down Expand Up @@ -178,7 +178,7 @@ pub(crate) fn inner_join_multiple_keys(
a: &DataFrame,
b: &DataFrame,
swap: bool,
) -> Vec<(IdxSize, IdxSize)> {
) -> (Vec<IdxSize>, Vec<IdxSize>) {
// we assume that the b DataFrame is the shorter relation.
// b will be used for the build phase.

Expand All @@ -189,7 +189,7 @@ pub(crate) fn inner_join_multiple_keys(
let (build_hashes, random_state) = df_rows_to_hashes_threaded(&dfs_b, None);
let (probe_hashes, _) = df_rows_to_hashes_threaded(&dfs_a, Some(random_state));

let hash_tbls = create_build_table(&build_hashes, b);
let hash_tbls = create_probe_table(&build_hashes, b);
// early drop to reduce memory pressure
drop(build_hashes);

Expand Down Expand Up @@ -235,7 +235,7 @@ pub(crate) fn inner_join_multiple_keys(
results
})
.flatten()
.collect()
.unzip()
})
}

Expand All @@ -261,7 +261,7 @@ pub(crate) fn left_join_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<LeftJ
let (build_hashes, random_state) = df_rows_to_hashes_threaded(&dfs_b, None);
let (probe_hashes, _) = df_rows_to_hashes_threaded(&dfs_a, Some(random_state));

let hash_tbls = create_build_table(&build_hashes, b);
let hash_tbls = create_probe_table(&build_hashes, b);
// early drop to reduce memory pressure
drop(build_hashes);

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/hash_join/single_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub(super) fn hash_join_tuples_inner<T, IntoSlice>(
build: Vec<IntoSlice>,
// Because b should be the shorter relation we could need to swap to keep left left and right right.
swap: bool,
) -> Vec<(IdxSize, IdxSize)>
) -> (Vec<IdxSize>, Vec<IdxSize>)
where
IntoSlice: AsRef<[T]> + Send + Sync,
T: Send + Hash + Eq + Sync + Copy + AsU64,
Expand Down Expand Up @@ -185,7 +185,7 @@ where
results
})
.flatten()
.collect()
.unzip()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Series {
}
}

pub(super) fn hash_join_inner(&self, other: &Series) -> Vec<(IdxSize, IdxSize)> {
pub(super) fn hash_join_inner(&self, other: &Series) -> (Vec<IdxSize>, Vec<IdxSize>) {
let (lhs, rhs) = (self.to_physical_repr(), other.to_physical_repr());

use DataType::*;
Expand Down Expand Up @@ -141,7 +141,7 @@ where
fn num_group_join_inner<T>(
left: &ChunkedArray<T>,
right: &ChunkedArray<T>,
) -> Vec<(IdxSize, IdxSize)>
) -> (Vec<IdxSize>, Vec<IdxSize>)
where
T: PolarsIntegerType,
T::Native: Hash + Eq + Send + AsU64 + Copy,
Expand Down Expand Up @@ -292,7 +292,7 @@ impl Utf8Chunked {
(splitted_a, splitted_b, swap, hb)
}

fn hash_join_inner(&self, other: &Utf8Chunked) -> Vec<(IdxSize, IdxSize)> {
fn hash_join_inner(&self, other: &Utf8Chunked) -> (Vec<IdxSize>, Vec<IdxSize>) {
let (splitted_a, splitted_b, swap, hb) = self.prepare(other, true);
let str_hashes_a = prepare_strs(&splitted_a, &hb);
let str_hashes_b = prepare_strs(&splitted_b, &hb);
Expand Down
21 changes: 19 additions & 2 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2950,9 +2950,26 @@ impl DataFrame {
.reduce(|acc, b| get_supertype(&acc?, &b.unwrap()))
}

#[cfg(any(feature = "partition_by", feature = "semi_anti_join"))]
pub(crate) unsafe fn take_unchecked_slice(&self, idx: &[IdxSize]) -> Self {
self.take_iter_unchecked(idx.iter().map(|i| *i as usize))
let ptr = idx.as_ptr() as *mut IdxSize;
let len = idx.len();

// create a temporary vec. we will not drop it.
let mut ca = IdxCa::from_vec("", Vec::from_raw_parts(ptr, len, len));
let out = self.take_unchecked(&ca);

// ref count of buffers should be one because we dropped all allocations
let arr = {
let arr_ref = std::mem::take(&mut ca.chunks).pop().unwrap();
arr_ref
.as_any()
.downcast_ref::<PrimitiveArray<IdxSize>>()
.unwrap()
.clone()
};
// the only owned heap allocation is the `Vec` we created and must not be dropped
let _ = std::mem::ManuallyDrop::new(arr.into_mut().right().unwrap());
out
}

#[cfg(feature = "partition_by")]
Expand Down

0 comments on commit ff20af7

Please sign in to comment.