Skip to content

Commit

Permalink
sort near hashmap collect
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 28, 2022
1 parent e533df9 commit 0422873
Show file tree
Hide file tree
Showing 19 changed files with 117 additions and 107 deletions.
11 changes: 5 additions & 6 deletions polars/polars-core/src/chunked_array/ops/unique/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,11 @@ where
return ca.clone();
}
let mut groups = ca
.group_tuples(true)
.group_tuples(true, false)
.into_idx()
.into_iter()
.collect_trusted::<Vec<_>>();
// groups.sort_unstable_by_key(|k| k.1.len());
// TODO! sort by key
groups.sort_unstable_by_key(|k| k.1.len());
let first = &groups[0];

let max_occur = first.1.len();
Expand Down Expand Up @@ -174,7 +173,7 @@ macro_rules! arg_unique_ca {

macro_rules! impl_value_counts {
($self:expr) => {{
let group_tuples = $self.group_tuples(true).into_idx();
let group_tuples = $self.group_tuples(true, false).into_idx();
let values =
unsafe { $self.take_unchecked(group_tuples.iter().map(|t| t.0 as usize).into()) };
let mut counts: NoNull<UInt32Chunked> = group_tuples
Expand Down Expand Up @@ -357,7 +356,7 @@ fn sort_columns(mut columns: Vec<Series>) -> Vec<Series> {

impl ToDummies<Utf8Type> for Utf8Chunked {
fn to_dummies(&self) -> Result<DataFrame> {
let groups = self.group_tuples(true).into_idx();
let groups = self.group_tuples(true, false).into_idx();
let col_name = self.name();
let taker = self.take_rand();

Expand All @@ -383,7 +382,7 @@ where
ChunkedArray<T>: ChunkOps + ChunkCompare<T::Native> + ChunkUnique<T>,
{
fn to_dummies(&self) -> Result<DataFrame> {
let groups = self.group_tuples(true).into_idx();
let groups = self.group_tuples(true, false).into_idx();
let col_name = self.name();
let taker = self.take_rand();

Expand Down
37 changes: 28 additions & 9 deletions polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::GroupsProxy;
use crate::frame::groupby::{GroupsIdx, IdxItem};
use crate::prelude::compare_inner::PartialEqInner;
use crate::prelude::*;
use crate::utils::CustomIterTools;
Expand All @@ -12,11 +13,21 @@ use hashbrown::{hash_map::RawEntryMut, HashMap};
use rayon::prelude::*;
use std::hash::{BuildHasher, Hash};

fn finish_group_order(out: Vec<Vec<IdxItem>>, sorted: bool) -> GroupsProxy {
if sorted {
let mut out = out.into_iter().flatten().collect::<Vec<_>>();
out.sort_unstable_by_key(|g| g.0);
GroupsProxy::Idx(GroupsIdx::from_iter(out.into_iter()))
} else {
GroupsProxy::Idx(GroupsIdx::from(out))
}
}

// We must strike a balance between cache coherence and resizing costs.
// Overallocation seems a lot more expensive than resizing so we start reasonable small.
pub(crate) const HASHMAP_INIT_SIZE: usize = 512;

pub(crate) fn groupby<T>(a: impl Iterator<Item = T>) -> GroupsProxy
pub(crate) fn groupby<T>(a: impl Iterator<Item = T>, sorted: bool) -> GroupsProxy
where
T: Hash + Eq,
{
Expand All @@ -37,8 +48,16 @@ where
}
}
});

GroupsProxy::Idx(hash_tbl.into_iter().map(|(_k, tpl)| tpl).collect())
if sorted {
let mut groups = hash_tbl
.into_iter()
.map(|(_k, v)| v)
.collect_trusted::<Vec<_>>();
groups.sort_unstable_by_key(|g| g.0);
GroupsProxy::Idx(groups.into_iter().collect())
} else {
GroupsProxy::Idx(hash_tbl.into_iter().map(|(_k, v)| v).collect())
}
}

/// Determine group tuples over different threads. The hash of the key is used to determine the partitions.
Expand All @@ -50,6 +69,7 @@ pub(crate) fn groupby_threaded_num<T, IntoSlice>(
keys: Vec<IntoSlice>,
group_size_hint: usize,
n_partitions: u64,
sorted: bool,
) -> GroupsProxy
where
T: Send + Hash + Eq + Sync + Copy + AsU64 + CallHasher,
Expand Down Expand Up @@ -106,9 +126,8 @@ where
.collect_trusted::<Vec<_>>()
})
})
.flatten()
.collect();
GroupsProxy::Idx(out)
.collect::<Vec<_>>();
finish_group_order(out, sorted)
}

/// Utility function used as comparison function in the hashmap.
Expand Down Expand Up @@ -237,6 +256,7 @@ pub(crate) fn populate_multiple_key_hashmap2<'a, V, H, F, G>(
pub(crate) fn groupby_threaded_multiple_keys_flat(
keys: DataFrame,
n_partitions: usize,
sorted: bool,
) -> GroupsProxy {
let dfs = split_df(&keys, n_partitions).unwrap();
let (hashes, _random_state) = df_rows_to_hashes_threaded(&dfs, None);
Expand Down Expand Up @@ -289,7 +309,6 @@ pub(crate) fn groupby_threaded_multiple_keys_flat(
hash_tbl.into_iter().map(|(_k, v)| v).collect::<Vec<_>>()
})
})
.flatten()
.collect();
GroupsProxy::Idx(groups)
.collect::<Vec<_>>();
finish_group_order(groups, sorted)
}
76 changes: 39 additions & 37 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait IntoGroupsProxy {
/// Create the tuples need for a groupby operation.
/// * The first value in the tuple is the first index of the group.
/// * The second value in the tuple is are the indexes of the groups including the first value.
fn group_tuples(&self, _multithreaded: bool) -> GroupsProxy {
fn group_tuples(&self, _multithreaded: bool, _sorted: bool) -> GroupsProxy {
unimplemented!()
}
}
Expand All @@ -45,7 +45,7 @@ fn group_multithreaded<T>(ca: &ChunkedArray<T>) -> bool {
ca.len() > 1000
}

fn num_groups_proxy<T>(ca: &ChunkedArray<T>, multithreaded: bool) -> GroupsProxy
fn num_groups_proxy<T>(ca: &ChunkedArray<T>, multithreaded: bool, sorted: bool) -> GroupsProxy
where
T: PolarsIntegerType,
T::Native: Hash + Eq + Send + AsU64,
Expand All @@ -66,26 +66,26 @@ where
if ca.chunks.len() == 1 {
if !ca.has_validity() {
let keys = vec![ca.cont_slice().unwrap()];
groupby_threaded_num(keys, group_size_hint, n_partitions)
groupby_threaded_num(keys, group_size_hint, n_partitions, sorted)
} else {
let keys = ca
.downcast_iter()
.map(|arr| arr.into_iter().map(|x| x.copied()).collect::<Vec<_>>())
.collect::<Vec<_>>();
groupby_threaded_num(keys, group_size_hint, n_partitions)
groupby_threaded_num(keys, group_size_hint, n_partitions, sorted)
}
// use the polars-iterators
} else if !ca.has_validity() {
let keys = vec![ca.into_no_null_iter().collect::<Vec<_>>()];
groupby_threaded_num(keys, group_size_hint, n_partitions)
groupby_threaded_num(keys, group_size_hint, n_partitions, sorted)
} else {
let keys = vec![ca.into_iter().collect::<Vec<_>>()];
groupby_threaded_num(keys, group_size_hint, n_partitions)
groupby_threaded_num(keys, group_size_hint, n_partitions, sorted)
}
} else if !ca.has_validity() {
groupby(ca.into_no_null_iter())
groupby(ca.into_no_null_iter(), sorted)
} else {
groupby(ca.into_iter())
groupby(ca.into_iter(), sorted)
}
}

Expand All @@ -94,48 +94,48 @@ where
T: PolarsNumericType,
T::Native: NumCast,
{
fn group_tuples(&self, multithreaded: bool) -> GroupsProxy {
fn group_tuples(&self, multithreaded: bool, sorted: bool) -> GroupsProxy {
match self.dtype() {
DataType::UInt64 => {
// convince the compiler that we are this type.
let ca: &UInt64Chunked = unsafe {
&*(self as *const ChunkedArray<T> as *const ChunkedArray<UInt64Type>)
};
num_groups_proxy(ca, multithreaded)
num_groups_proxy(ca, multithreaded, sorted)
}
DataType::UInt32 => {
// convince the compiler that we are this type.
let ca: &UInt32Chunked = unsafe {
&*(self as *const ChunkedArray<T> as *const ChunkedArray<UInt32Type>)
};
num_groups_proxy(ca, multithreaded)
num_groups_proxy(ca, multithreaded, sorted)
}
DataType::Int64 | DataType::Float64 => {
let ca = self.bit_repr_large();
num_groups_proxy(&ca, multithreaded)
num_groups_proxy(&ca, multithreaded, sorted)
}
DataType::Int32 | DataType::Float32 => {
let ca = self.bit_repr_small();
num_groups_proxy(&ca, multithreaded)
num_groups_proxy(&ca, multithreaded, sorted)
}
_ => {
let ca = self.cast(&DataType::UInt32).unwrap();
let ca = ca.u32().unwrap();
num_groups_proxy(ca, multithreaded)
num_groups_proxy(ca, multithreaded, sorted)
}
}
}
}
impl IntoGroupsProxy for BooleanChunked {
fn group_tuples(&self, multithreaded: bool) -> GroupsProxy {
fn group_tuples(&self, multithreaded: bool, sorted: bool) -> GroupsProxy {
let ca = self.cast(&DataType::UInt32).unwrap();
let ca = ca.u32().unwrap();
ca.group_tuples(multithreaded)
ca.group_tuples(multithreaded, sorted)
}
}

impl IntoGroupsProxy for Utf8Chunked {
fn group_tuples(&self, multithreaded: bool) -> GroupsProxy {
fn group_tuples(&self, multithreaded: bool, sorted: bool) -> GroupsProxy {
let hb = RandomState::default();
let null_h = get_null_hash_value(hb.clone());

Expand All @@ -160,7 +160,7 @@ impl IntoGroupsProxy for Utf8Chunked {
})
.collect::<Vec<_>>()
});
groupby_threaded_num(str_hashes, 0, n_partitions as u64)
groupby_threaded_num(str_hashes, 0, n_partitions as u64, sorted)
} else {
let str_hashes = self
.into_iter()
Expand All @@ -172,22 +172,22 @@ impl IntoGroupsProxy for Utf8Chunked {
StrHash::new(opt_s, hash)
})
.collect::<Vec<_>>();
groupby(str_hashes.iter())
groupby(str_hashes.iter(), sorted)
}
}
}

#[cfg(feature = "dtype-categorical")]
impl IntoGroupsProxy for CategoricalChunked {
fn group_tuples(&self, multithreaded: bool) -> GroupsProxy {
self.deref().group_tuples(multithreaded)
fn group_tuples(&self, multithreaded: bool, sorted: bool) -> GroupsProxy {
self.deref().group_tuples(multithreaded, sorted)
}
}

impl IntoGroupsProxy for ListChunked {
#[cfg(feature = "groupby_list")]
fn group_tuples(&self, _multithreaded: bool) -> GroupsProxy {
groupby(self.into_iter().map(|opt_s| opt_s.map(Wrap)))
fn group_tuples(&self, _multithreaded: bool, sorted: bool) -> GroupsProxy {
groupby(self.into_iter().map(|opt_s| opt_s.map(Wrap)), sorted)
}
}

Expand All @@ -196,8 +196,8 @@ impl<T> IntoGroupsProxy for ObjectChunked<T>
where
T: PolarsObject,
{
fn group_tuples(&self, _multithreaded: bool) -> GroupsProxy {
groupby(self.into_iter())
fn group_tuples(&self, _multithreaded: bool, sorted: bool) -> GroupsProxy {
groupby(self.into_iter(), sorted)
}
}

Expand Down Expand Up @@ -307,7 +307,12 @@ fn pack_u32_u64_tuples(opt_l: Option<u32>, opt_r: Option<u64>) -> [u8; 13] {
}

impl DataFrame {
pub fn groupby_with_series(&self, by: Vec<Series>, multithreaded: bool) -> Result<GroupBy> {
pub fn groupby_with_series(
&self,
by: Vec<Series>,
multithreaded: bool,
sorted: bool,
) -> Result<GroupBy> {
macro_rules! finish_packed_bit_path {
($ca0:expr, $ca1:expr, $pack_fn:expr) => {{
let n_partitions = set_partition_size();
Expand Down Expand Up @@ -335,7 +340,7 @@ impl DataFrame {
return Ok(GroupBy::new(
self,
by,
groupby_threaded_num(keys, 0, n_partitions as u64),
groupby_threaded_num(keys, 0, n_partitions as u64, sorted),
None,
));
}};
Expand Down Expand Up @@ -373,7 +378,7 @@ impl DataFrame {
let groups = match by.len() {
1 => {
let series = &by[0];
series.group_tuples(multithreaded)
series.group_tuples(multithreaded, sorted)
}
_ => {
// multiple keys is always multi-threaded
Expand Down Expand Up @@ -411,7 +416,7 @@ impl DataFrame {
}

let n_partitions = set_partition_size();
groupby_threaded_multiple_keys_flat(keys_df, n_partitions)
groupby_threaded_multiple_keys_flat(keys_df, n_partitions, sorted)
}
};
Ok(GroupBy::new(self, by, groups, None))
Expand All @@ -435,7 +440,7 @@ impl DataFrame {
S: AsRef<str>,
{
let selected_keys = self.select_series(by)?;
self.groupby_with_series(selected_keys, true)
self.groupby_with_series(selected_keys, true, false)
}

/// Group DataFrame using a Series column.
Expand All @@ -445,9 +450,8 @@ impl DataFrame {
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut gb = self.groupby(by)?;
gb.groups.idx_mut().sort();
Ok(gb)
let selected_keys = self.select_series(by)?;
self.groupby_with_series(selected_keys, true, true)
}
}

Expand Down Expand Up @@ -1466,12 +1470,10 @@ mod test {
let ca = UInt32Chunked::new("", slice);
let split = split_ca(&ca, 4).unwrap();

let mut a = groupby(ca.into_iter()).into_idx();
a.sort();
let mut a = groupby(ca.into_iter(), true).into_idx();

let keys = split.iter().map(|ca| ca.cont_slice().unwrap()).collect();
let mut b = groupby_threaded_num(keys, 0, split.len() as u64).into_idx();
b.sort();
let mut b = groupby_threaded_num(keys, 0, split.len() as u64, true).into_idx();

assert_eq!(a, b);
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/groupby/pivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl DataFrame {
// group tuples are in bounds
let sub_vals_and_cols = match indicator {
GroupsIndicator::Idx(g) => unsafe {
values_and_columns[i].take_unchecked_slice(&g.1)
values_and_columns[i].take_unchecked_slice(g.1)
},
GroupsIndicator::Slice([first, len]) => {
values_and_columns[i].slice(first as i64, len as usize)
Expand All @@ -190,7 +190,7 @@ impl DataFrame {

let s = sub_vals_and_cols.column(column).unwrap().clone();
let gb = sub_vals_and_cols
.groupby_with_series(vec![s], false)
.groupby_with_series(vec![s], false, false)
.unwrap();

use PivotAgg::*;
Expand Down

0 comments on commit 0422873

Please sign in to comment.