Skip to content

Commit

Permalink
perf: join by row-encoding (#15559)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 10, 2024
1 parent b91dedb commit 8f40509
Show file tree
Hide file tree
Showing 18 changed files with 532 additions and 992 deletions.
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/utils.rs
Expand Up @@ -30,6 +30,7 @@ pub fn combine_validities_and(opt_l: Option<&Bitmap>, opt_r: Option<&Bitmap>) ->
(None, None) => None,
}
}

pub fn combine_validities_or(opt_l: Option<&Bitmap>, opt_r: Option<&Bitmap>) -> Option<Bitmap> {
match (opt_l, opt_r) {
(Some(l), Some(r)) => Some(l.bitor(r)),
Expand Down
72 changes: 72 additions & 0 deletions crates/polars-core/src/chunked_array/binary.rs
@@ -0,0 +1,72 @@
use ahash::RandomState;
use polars_utils::hashing::BytesHash;
use rayon::prelude::*;

use crate::hashing::get_null_hash_value;
use crate::prelude::*;
use crate::utils::{_set_partition_size, _split_offsets};
use crate::POOL;

#[inline]
fn fill_bytes_hashes<'a, T>(ca: &'a ChunkedArray<T>, null_h: u64, hb: RandomState) -> Vec<BytesHash>
where
T: PolarsDataType,
<<T as PolarsDataType>::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>,
{
let mut byte_hashes = Vec::with_capacity(ca.len());
for arr in ca.downcast_iter() {
for opt_b in arr.iter() {
let opt_b = opt_b.as_ref().map(|v| v.as_ref());
// SAFETY:
// the underlying data is tied to self
let opt_b = unsafe { std::mem::transmute::<Option<&[u8]>, Option<&'a [u8]>>(opt_b) };
let hash = match opt_b {
Some(s) => hb.hash_one(s),
None => null_h,
};
byte_hashes.push(BytesHash::new(opt_b, hash))
}
}
byte_hashes
}

impl<T> ChunkedArray<T>
where
T: PolarsDataType,
for<'a> <T::Array as StaticArray>::ValueT<'a>: AsRef<[u8]>,
{
#[allow(clippy::needless_lifetimes)]
pub fn to_bytes_hashes<'a>(
&'a self,
multithreaded: bool,
hb: RandomState,
) -> Vec<Vec<BytesHash<'a>>> {
let null_h = get_null_hash_value(&hb);

if multithreaded {
let n_partitions = _set_partition_size();

let split = _split_offsets(self.len(), n_partitions);

POOL.install(|| {
split
.into_par_iter()
.map(|(offset, len)| {
let ca = self.slice(offset as i64, len);
let byte_hashes = fill_bytes_hashes(&ca, null_h, hb.clone());

// SAFETY:
// the underlying data is tied to self
unsafe {
std::mem::transmute::<Vec<BytesHash<'_>>, Vec<BytesHash<'a>>>(
byte_hashes,
)
}
})
.collect::<Vec<_>>()
})
} else {
vec![fill_bytes_hashes(self, null_h, hb.clone())]
}
}
}
29 changes: 29 additions & 0 deletions crates/polars-core/src/chunked_array/float.rs
@@ -1,6 +1,7 @@
use arrow::legacy::kernels::float::*;
use arrow::legacy::kernels::set::set_at_nulls;
use num_traits::Float;
use polars_utils::total_ord::{canonical_f32, canonical_f64};

use crate::prelude::*;

Expand Down Expand Up @@ -31,3 +32,31 @@ where
ChunkedArray::from_chunk_iter(self.name(), chunks)
}
}

pub trait Canonical {
fn canonical(self) -> Self;
}

impl Canonical for f32 {
#[inline]
fn canonical(self) -> Self {
canonical_f32(self)
}
}

impl Canonical for f64 {
#[inline]
fn canonical(self) -> Self {
canonical_f64(self)
}
}

impl<T> ChunkedArray<T>
where
T: PolarsFloatType,
T::Native: Float + Canonical,
{
pub fn to_canonical(&self) -> Self {
self.apply_values_generic(|v| v.canonical())
}
}
1 change: 1 addition & 0 deletions crates/polars-core/src/chunked_array/mod.rs
Expand Up @@ -24,6 +24,7 @@ pub(crate) mod ndarray;

#[cfg(feature = "dtype-array")]
pub(crate) mod array;
mod binary;
mod bitwise;
#[cfg(feature = "object")]
mod drop;
Expand Down
36 changes: 34 additions & 2 deletions crates/polars-core/src/chunked_array/ops/sort/arg_sort_multiple.rs
@@ -1,3 +1,4 @@
use arrow::compute::utils::combine_validities_and;
use compare_inner::NullOrderCmp;
use polars_row::{convert_columns, EncodingField, RowsEncoded};
use polars_utils::iter::EnumerateIdxTrait;
Expand Down Expand Up @@ -87,7 +88,26 @@ pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
Ok(out)
}

pub(crate) fn encode_rows_vertical_par_unordered(
pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter("", chunks?))
}

// Almost the same but broadcast nulls to the row-encoded array.
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
Expand All @@ -100,7 +120,19 @@ pub(crate) fn encode_rows_vertical_par_unordered(
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())

let validity = sliced
.iter()
.flat_map(|s| {
let s = s.rechunk();
#[allow(clippy::unnecessary_to_owned)]
s.chunks()
.to_vec()
.into_iter()
.map(|arr| arr.validity().cloned())
})
.fold(None, |l, r| combine_validities_and(l.as_ref(), r.as_ref()));
Ok(rows.into_array().with_validity_typed(validity))
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/group_by/hashing.rs
Expand Up @@ -124,14 +124,14 @@ where
// have the code duplication
pub(crate) fn group_by_threaded_slice<T, IntoSlice>(
keys: Vec<IntoSlice>,
n_partitions: usize,
sorted: bool,
) -> GroupsProxy
where
T: TotalHash + TotalEq + ToTotalOrd,
<T as ToTotalOrd>::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash,
IntoSlice: AsRef<[T]> + Send + Sync,
{
let n_partitions = keys.len();
let init_size = get_init_size();

// We will create a hashtable in every thread.
Expand Down
98 changes: 11 additions & 87 deletions crates/polars-core/src/frame/group_by/into_groups.rs
Expand Up @@ -4,7 +4,6 @@ use polars_utils::total_ord::{ToTotalOrd, TotalHash};
use super::*;
use crate::config::verbose;
use crate::prelude::sort::arg_sort_multiple::_get_rows_encoded_ca_unordered;
use crate::utils::_split_offsets;
use crate::utils::flatten::flatten_par;

/// Used to create the tuples for a group_by operation.
Expand Down Expand Up @@ -37,7 +36,7 @@ where
.downcast_iter()
.map(|arr| arr.values().as_slice())
.collect::<Vec<_>>();
group_by_threaded_slice(keys, n_partitions, sorted)
group_by_threaded_slice(keys, sorted)
} else {
let keys = ca
.downcast_iter()
Expand Down Expand Up @@ -256,108 +255,33 @@ impl IntoGroupsProxy for StringChunked {
}
}

fn fill_bytes_hashes(ca: &BinaryChunked, null_h: u64, hb: RandomState) -> Vec<BytesHash> {
let mut byte_hashes = Vec::with_capacity(ca.len());
for arr in ca.downcast_iter() {
for opt_b in arr {
let hash = match opt_b {
Some(s) => hb.hash_one(s),
None => null_h,
};
byte_hashes.push(BytesHash::new(opt_b, hash))
}
}
byte_hashes
}

impl IntoGroupsProxy for BinaryChunked {
#[allow(clippy::needless_lifetimes)]
fn group_tuples<'a>(&'a self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsProxy> {
let hb = RandomState::default();
let null_h = get_null_hash_value(&hb);
let bh = self.to_bytes_hashes(multithreaded, Default::default());

let out = if multithreaded {
let n_partitions = _set_partition_size();

let split = _split_offsets(self.len(), n_partitions);

let byte_hashes = POOL.install(|| {
split
.into_par_iter()
.map(|(offset, len)| {
let ca = self.slice(offset as i64, len);
let byte_hashes = fill_bytes_hashes(&ca, null_h, hb.clone());

// SAFETY:
// the underlying data is tied to self
unsafe {
std::mem::transmute::<Vec<BytesHash<'_>>, Vec<BytesHash<'a>>>(
byte_hashes,
)
}
})
.collect::<Vec<_>>()
});
group_by_threaded_slice(byte_hashes, n_partitions, sorted)
// Take slices so that the vecs are not cloned.
let bh = bh.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
group_by_threaded_slice(bh, sorted)
} else {
let byte_hashes = fill_bytes_hashes(self, null_h, hb.clone());
group_by(byte_hashes.iter(), sorted)
group_by(bh[0].iter(), sorted)
};
Ok(out)
}
}

fn fill_bytes_offset_hashes(
ca: &BinaryOffsetChunked,
null_h: u64,
hb: RandomState,
) -> Vec<BytesHash> {
let mut byte_hashes = Vec::with_capacity(ca.len());
for arr in ca.downcast_iter() {
for opt_b in arr {
let hash = match opt_b {
Some(s) => hb.hash_one(s),
None => null_h,
};
byte_hashes.push(BytesHash::new(opt_b, hash))
}
}
byte_hashes
}

impl IntoGroupsProxy for BinaryOffsetChunked {
#[allow(clippy::needless_lifetimes)]
fn group_tuples<'a>(&'a self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsProxy> {
let hb = RandomState::default();
let null_h = get_null_hash_value(&hb);
let bh = self.to_bytes_hashes(multithreaded, Default::default());

let out = if multithreaded {
let n_partitions = _set_partition_size();

let split = _split_offsets(self.len(), n_partitions);

let byte_hashes = POOL.install(|| {
split
.into_par_iter()
.map(|(offset, len)| {
let ca = self.slice(offset as i64, len);
let byte_hashes = fill_bytes_offset_hashes(&ca, null_h, hb.clone());

// SAFETY:
// the underlying data is tied to self
unsafe {
std::mem::transmute::<Vec<BytesHash<'_>>, Vec<BytesHash<'a>>>(
byte_hashes,
)
}
})
.collect::<Vec<_>>()
});
let byte_hashes = byte_hashes.iter().collect::<Vec<_>>();
group_by_threaded_slice(byte_hashes, n_partitions, sorted)
// Take slices so that the vecs are not cloned.
let bh = bh.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
group_by_threaded_slice(bh, sorted)
} else {
let byte_hashes = fill_bytes_offset_hashes(self, null_h, hb.clone());
group_by(byte_hashes.iter(), sorted)
group_by(bh[0].iter(), sorted)
};
Ok(out)
}
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-core/src/frame/group_by/mod.rs
@@ -1,13 +1,11 @@
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;

use ahash::RandomState;
use num_traits::NumCast;
use polars_utils::hashing::{BytesHash, DirtyHash};
use polars_utils::hashing::DirtyHash;
use rayon::prelude::*;

use self::hashing::*;
use crate::hashing::get_null_hash_value;
use crate::prelude::*;
use crate::utils::{_set_partition_size, accumulate_dataframes_vertical};
use crate::POOL;
Expand Down
15 changes: 0 additions & 15 deletions crates/polars-core/src/utils/series.rs
Expand Up @@ -2,21 +2,6 @@ use crate::prelude::*;
use crate::series::unstable::UnstableSeries;
use crate::series::IsSorted;

/// Transform to physical type and coerce similar sized integer to a bit representation
/// to reduce compiler bloat
pub fn _to_physical_and_bit_repr(s: &[Series]) -> Vec<Series> {
s.iter()
.map(|s| {
let physical = s.to_physical_repr();
match physical.dtype() {
DataType::Int64 => physical.bit_repr_large().into_series(),
DataType::Int32 => physical.bit_repr_small().into_series(),
_ => physical.into_owned(),
}
})
.collect()
}

/// A utility that allocates an [`UnstableSeries`]. The applied function can then use that
/// series container to save heap allocations and swap arrow arrays.
pub fn with_unstable_series<F, T>(dtype: &DataType, f: F) -> T
Expand Down

0 comments on commit 8f40509

Please sign in to comment.