Skip to content

Commit

Permalink
reduce memory usage of multiple key hash
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 24, 2021
1 parent 0cb5f44 commit 9fb25c7
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 110 deletions.
4 changes: 4 additions & 0 deletions polars/polars-arrow/src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ impl<T> AlignedVec<T> {
self.inner.as_mut_slice()
}

pub fn as_slice(&self) -> &[T] {
self.inner.as_slice()
}

pub fn capacity(&self) -> usize {
self.inner.capacity()
}
Expand Down
73 changes: 73 additions & 0 deletions polars/polars-core/src/chunked_array/ops/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,23 @@ where
.map(f)
.collect()
}
fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
where
F: Fn(Option<T::Native>, &V) -> V,
{
assert!(slice.len() >= self.len());

let mut idx = 0;
self.downcast_iter().for_each(|arr| {
arr.into_iter().for_each(|opt_val| {
// Safety:
// length asserted above
let item = unsafe { slice.get_unchecked_mut(idx) };
*item = f(opt_val, item);
idx += 1;
})
});
}
}

impl<'a> ChunkApply<'a, bool, bool> for BooleanChunked {
Expand Down Expand Up @@ -180,6 +197,24 @@ impl<'a> ChunkApply<'a, bool, bool> for BooleanChunked {
{
self.into_iter().enumerate().map(f).collect()
}

fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
where
F: Fn(Option<bool>, &T) -> T,
{
assert!(slice.len() >= self.len());

let mut idx = 0;
self.downcast_iter().for_each(|arr| {
arr.into_iter().for_each(|opt_val| {
// Safety:
// length asserted above
let item = unsafe { slice.get_unchecked_mut(idx) };
*item = f(opt_val, item);
idx += 1;
})
});
}
}

impl<'a> ChunkApply<'a, &'a str, Cow<'a, str>> for Utf8Chunked {
Expand Down Expand Up @@ -246,6 +281,24 @@ impl<'a> ChunkApply<'a, &'a str, Cow<'a, str>> for Utf8Chunked {
{
self.into_iter().enumerate().map(f).collect()
}

fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
where
F: Fn(Option<&'a str>, &T) -> T,
{
assert!(slice.len() >= self.len());

let mut idx = 0;
self.downcast_iter().for_each(|arr| {
arr.into_iter().for_each(|opt_val| {
// Safety:
// length asserted above
let item = unsafe { slice.get_unchecked_mut(idx) };
*item = f(opt_val, item);
idx += 1;
})
});
}
}

impl ChunkApplyKernel<BooleanArray> for BooleanChunked {
Expand Down Expand Up @@ -396,4 +449,24 @@ impl<'a> ChunkApply<'a, Series, Series> for ListChunked {
{
self.into_iter().enumerate().map(f).collect()
}

fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
where
F: Fn(Option<Series>, &T) -> T,
{
assert!(slice.len() >= self.len());

let mut idx = 0;
self.downcast_iter().for_each(|arr| {
arr.iter().for_each(|opt_val| {
let opt_val = opt_val.map(|arrayref| Series::try_from(("", arrayref)).unwrap());

// Safety:
// length asserted above
let item = unsafe { slice.get_unchecked_mut(idx) };
*item = f(opt_val, item);
idx += 1;
})
});
}
}
6 changes: 6 additions & 0 deletions polars/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ pub trait ChunkApply<'a, A, B> {
fn apply_with_idx_on_opt<F>(&'a self, f: F) -> Self
where
F: Fn((usize, Option<A>)) -> Option<B> + Copy;

/// Apply a closure elementwise and write results to a mutable slice.
fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
// (value of chunkedarray, value of slice) -> value of slice
where
F: Fn(Option<A>, &T) -> T;
}

/// Aggregation operations
Expand Down
13 changes: 12 additions & 1 deletion polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,18 @@ impl DataFrame {
by.iter()
.map(|s| match s.dtype() {
DataType::Categorical => s.cast::<UInt32Type>().unwrap(),
_ => s.clone(),
DataType::Float32 => s.bit_repr_small().into_series(),
// otherwise we use the vec hash for float
#[cfg(feature = "dtype-u64")]
DataType::Float64 => s.bit_repr_large().into_series(),
_ => {
// is date like
if !s.is_numeric() && s.is_numeric_physical() {
s.to_physical_repr()
} else {
s.clone()
}
}
})
.collect(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/series/implementations/dates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ macro_rules! impl_dyn_series {
try_physical_dispatch!(self, zip_with_same_type, mask, other)
}

fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
fn vec_hash(&self, random_state: RandomState) -> AlignedVec<u64> {
cast_and_apply!(self, vec_hash, random_state)
}

Expand Down
6 changes: 5 additions & 1 deletion polars/polars-core/src/series/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ macro_rules! impl_dyn_series {
(&self.0).into_partial_eq_inner()
}

fn vec_hash(&self, random_state: RandomState) -> UInt64Chunked {
fn vec_hash(&self, random_state: RandomState) -> AlignedVec<u64> {
self.0.vec_hash(random_state)
}

fn vec_hash_combine(&self, build_hasher: RandomState, hashes: &mut [u64]) {
self.0.vec_hash_combine(build_hasher, hashes)
}

fn agg_mean(&self, groups: &[(u32, Vec<u32>)]) -> Option<Series> {
self.0.agg_mean(groups)
}
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ pub(crate) mod private {
fn into_partial_eq_inner<'a>(&'a self) -> Box<dyn PartialEqInner + 'a> {
unimplemented!()
}
fn vec_hash(&self, _build_hasher: RandomState) -> UInt64Chunked {
fn vec_hash(&self, _build_hasher: RandomState) -> AlignedVec<u64> {
unimplemented!()
}
fn vec_hash_combine(&self, _build_hasher: RandomState, _hashes: &mut [u64]) {
unimplemented!()
}
fn agg_mean(&self, _groups: &[(u32, Vec<u32>)]) -> Option<Series> {
Expand Down

0 comments on commit 9fb25c7

Please sign in to comment.