Skip to content

Commit

Permalink
tightly packed grouptuples
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 28, 2022
1 parent f6e1155 commit e533df9
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 68 deletions.
9 changes: 7 additions & 2 deletions polars/polars-core/src/chunked_array/ops/unique/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,13 @@ where
if ca.is_empty() {
return ca.clone();
}
let mut groups = ca.group_tuples(true).into_idx();
groups.sort_unstable_by_key(|k| k.1.len());
let mut groups = ca
.group_tuples(true)
.into_idx()
.into_iter()
.collect_trusted::<Vec<_>>();
// groups.sort_unstable_by_key(|k| k.1.len());
// TODO! sort by key
let first = &groups[0];

let max_occur = first.1.len();
Expand Down
19 changes: 10 additions & 9 deletions polars/polars-core/src/frame/groupby/aggregations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow::types::{simd::Simd, NativeType};

#[cfg(feature = "object")]
use crate::chunked_array::object::extension::create_extension;
use crate::frame::groupby::GroupsIdx;
#[cfg(feature = "object")]
use crate::frame::groupby::GroupsIndicator;
use crate::prelude::*;
Expand All @@ -22,13 +23,13 @@ where
ca.slice(first as i64, len as usize)
}

fn agg_helper_idx<T, F>(groups: &[(u32, Vec<u32>)], f: F) -> Option<Series>
fn agg_helper_idx<T, F>(groups: &GroupsIdx, f: F) -> Option<Series>
where
F: Fn(&(u32, Vec<u32>)) -> Option<T::Native> + Send + Sync,
F: Fn((u32, &Vec<u32>)) -> Option<T::Native> + Send + Sync,
T: PolarsNumericType,
ChunkedArray<T>: IntoSeries,
{
let ca: ChunkedArray<T> = POOL.install(|| groups.par_iter().map(f).collect());
let ca: ChunkedArray<T> = POOL.install(|| groups.into_par_iter().map(f).collect());
Some(ca.into_series())
}

Expand Down Expand Up @@ -99,7 +100,7 @@ impl Series {
if idx.is_empty() {
None
} else {
Some(*first as usize)
Some(first as usize)
}
});
// Safety:
Expand Down Expand Up @@ -198,7 +199,7 @@ where
if idx.is_empty() {
None
} else if idx.len() == 1 {
self.get(*first as usize)
self.get(first as usize)
} else {
match (self.has_validity(), self.chunks.len()) {
(false, 1) => Some(unsafe {
Expand Down Expand Up @@ -247,7 +248,7 @@ where
if idx.is_empty() {
None
} else if idx.len() == 1 {
self.get(*first as usize)
self.get(first as usize)
} else {
match (self.has_validity(), self.chunks.len()) {
(false, 1) => Some(unsafe {
Expand Down Expand Up @@ -296,7 +297,7 @@ where
if idx.is_empty() {
None
} else if idx.len() == 1 {
self.get(*first as usize)
self.get(first as usize)
} else {
match (self.has_validity(), self.chunks.len()) {
(false, 1) => Some(unsafe {
Expand Down Expand Up @@ -365,7 +366,7 @@ where
let out = if idx.is_empty() {
None
} else if idx.len() == 1 {
self.get(*first as usize).map(|sum| sum.to_f64().unwrap())
self.get(first as usize).map(|sum| sum.to_f64().unwrap())
} else {
match (self.has_validity(), self.chunks.len()) {
(false, 1) => unsafe {
Expand Down Expand Up @@ -553,7 +554,7 @@ where
if idx.is_empty() {
None
} else if idx.len() == 1 {
self.get(*first as usize).map(|sum| sum.to_f64().unwrap())
self.get(first as usize).map(|sum| sum.to_f64().unwrap())
} else {
match (self.has_validity(), self.chunks.len()) {
(false, 1) => unsafe {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
}
});

GroupsProxy::Idx(hash_tbl.into_iter().map(|(_k, tpl)| tpl).collect_trusted())
GroupsProxy::Idx(hash_tbl.into_iter().map(|(_k, tpl)| tpl).collect())
}

/// Determine group tuples over different threads. The hash of the key is used to determine the partitions.
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl DataFrame {
S: AsRef<str>,
{
let mut gb = self.groupby(by)?;
gb.groups.idx_mut().sort_unstable_by_key(|t| t.0);
gb.groups.idx_mut().sort();
Ok(gb)
}
}
Expand Down Expand Up @@ -570,7 +570,7 @@ impl<'df> GroupBy<'df> {
// groupby indexes are in bound.
unsafe {
s.take_iter_unchecked(
&mut self.groups.idx_ref().iter().map(|(idx, _)| *idx as usize),
&mut self.groups.idx_ref().iter().map(|(idx, _)| idx as usize),
)
}
})
Expand Down
142 changes: 97 additions & 45 deletions polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use crate::prelude::*;
use crate::utils::NoNull;
use crate::POOL;
use polars_arrow::utils::{CustomIterTools, FromTrustedLenIterator};
use rayon::iter::plumbing::UnindexedConsumer;
use rayon::prelude::*;
use std::ops::{Deref, DerefMut};
use std::vec::IntoIter;

/// Indexes of the groups, the first index is stored separately.
/// this make sorting fast.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct GroupsIdx(Vec<(u32, Vec<u32>)>);
pub struct GroupsIdx {
first: Vec<u32>,
all: Vec<Vec<u32>>,
}

pub type IdxItem = (u32, Vec<u32>);
pub type BorrowIdxItem<'a> = (u32, &'a Vec<u32>);

impl Drop for GroupsIdx {
fn drop(&mut self) {
let v = std::mem::take(&mut self.0);
let v = std::mem::take(&mut self.all);
// ~65k took approximately 1ms on local machine, so from that point we drop on other thread
// to stop query from being blocked
if v.len() > 1 << 16 {
Expand All @@ -27,41 +33,85 @@ impl Drop for GroupsIdx {

impl From<Vec<IdxItem>> for GroupsIdx {
fn from(v: Vec<IdxItem>) -> Self {
GroupsIdx(v)
v.into_iter().collect()
}
}

impl GroupsIdx {
pub fn sort(&mut self) {
let mut idx = 0;
let first = std::mem::take(&mut self.first);
// store index and values so that we can sort those
let mut idx_vals = first
.into_iter()
.map(|v| {
let out = [idx, v];
idx += 1;
out
})
.collect_trusted::<Vec<_>>();
idx_vals.sort_unstable_by_key(|v| v[1]);

let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
let take_all = || {
idx_vals
.iter()
.map(|v| unsafe {
let idx = v[0] as usize;
std::mem::take(self.all.get_unchecked_mut(idx))
})
.collect_trusted::<Vec<_>>()
};
let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
self.first = first;
self.all = all;
}
pub fn iter(
&self,
) -> std::iter::Zip<std::iter::Copied<std::slice::Iter<u32>>, std::slice::Iter<Vec<u32>>> {
self.into_iter()
}

pub(crate) fn len(&self) -> usize {
self.first.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem {
let first = *self.first.get_unchecked(index);
let all = self.all.get_unchecked(index);
(first, all)
}
}

impl FromIterator<IdxItem> for GroupsIdx {
fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
GroupsIdx(iter.into_iter().collect())
let (first, all) = iter.into_iter().unzip();
GroupsIdx { first, all }
}
}

impl<'a> IntoIterator for &'a GroupsIdx {
type Item = &'a IdxItem;
type IntoIter = std::slice::Iter<'a, IdxItem>;
type Item = BorrowIdxItem<'a>;
type IntoIter = std::iter::Zip<
std::iter::Copied<std::slice::Iter<'a, u32>>,
std::slice::Iter<'a, Vec<u32>>,
>;

fn into_iter(self) -> Self::IntoIter {
self.0.iter()
self.first.iter().copied().zip(self.all.iter())
}
}

impl<'a> IntoIterator for GroupsIdx {
type Item = IdxItem;
type IntoIter = std::vec::IntoIter<IdxItem>;
type IntoIter = std::iter::Zip<std::vec::IntoIter<u32>, std::vec::IntoIter<Vec<u32>>>;

fn into_iter(mut self) -> Self::IntoIter {
let a = std::mem::take(&mut self.0);
a.into_iter()
}
}

impl FromTrustedLenIterator<IdxItem> for GroupsIdx {
fn from_iter_trusted_length<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self
where
T::IntoIter: TrustedLen,
{
GroupsIdx(iter.into_iter().collect_trusted())
let first = std::mem::take(&mut self.first);
let all = std::mem::take(&mut self.all);
first.into_iter().zip(all.into_iter())
}
}

Expand All @@ -70,32 +120,31 @@ impl FromParallelIterator<IdxItem> for GroupsIdx {
where
I: IntoParallelIterator<Item = IdxItem>,
{
let v = Vec::from_par_iter(par_iter);
GroupsIdx(v)
let (first, all) = par_iter.into_par_iter().unzip();
GroupsIdx { first, all }
}
}

impl IntoParallelIterator for GroupsIdx {
type Iter = rayon::vec::IntoIter<IdxItem>;
type Item = IdxItem;
impl<'a> IntoParallelIterator for &'a GroupsIdx {
type Iter = rayon::iter::Zip<
rayon::iter::Copied<rayon::slice::Iter<'a, u32>>,
rayon::slice::Iter<'a, Vec<u32>>,
>;
type Item = BorrowIdxItem<'a>;

fn into_par_iter(mut self) -> Self::Iter {
let a = std::mem::take(&mut self.0);
a.into_par_iter()
self.first.par_iter().copied().zip(self.all.par_iter())
}
}

impl Deref for GroupsIdx {
type Target = Vec<(u32, Vec<u32>)>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
impl IntoParallelIterator for GroupsIdx {
type Iter = rayon::iter::Zip<rayon::vec::IntoIter<u32>, rayon::vec::IntoIter<Vec<u32>>>;
type Item = IdxItem;

impl DerefMut for GroupsIdx {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
fn into_par_iter(mut self) -> Self::Iter {
let first = std::mem::take(&mut self.first);
let all = std::mem::take(&mut self.all);
first.into_par_iter().zip(all.into_par_iter())
}
}

Expand All @@ -113,7 +162,7 @@ pub enum GroupsProxy {

impl Default for GroupsProxy {
fn default() -> Self {
GroupsProxy::Idx(GroupsIdx(vec![]))
GroupsProxy::Idx(GroupsIdx::default())
}
}

Expand All @@ -136,9 +185,7 @@ impl GroupsProxy {
#[cfg(feature = "private")]
pub fn sort(&mut self) {
match self {
GroupsProxy::Idx(groups) => {
groups.sort_unstable_by_key(|t| t.0);
}
GroupsProxy::Idx(groups) => groups.sort(),
GroupsProxy::Slice(groups) => {
groups.sort_unstable_by_key(|[first, _]| *first);
}
Expand All @@ -164,7 +211,11 @@ impl GroupsProxy {

pub fn get(&self, index: usize) -> GroupsIndicator {
match self {
GroupsProxy::Idx(groups) => GroupsIndicator::Idx(&groups[index]),
GroupsProxy::Idx(groups) => {
let first = groups.first[index];
let all = &groups.all[index];
GroupsIndicator::Idx((first, all))
}
GroupsProxy::Slice(groups) => GroupsIndicator::Slice(groups[index]),
}
}
Expand Down Expand Up @@ -235,7 +286,7 @@ impl From<GroupsIdx> for GroupsProxy {
}

pub enum GroupsIndicator<'a> {
Idx(&'a (u32, Vec<u32>)),
Idx(BorrowIdxItem<'a>),
Slice([u32; 2]),
}

Expand Down Expand Up @@ -275,9 +326,10 @@ impl<'a> Iterator for GroupsProxyIter<'a> {

let out = unsafe {
match self.vals {
GroupsProxy::Idx(groups) => {
Some(GroupsIndicator::Idx(groups.get_unchecked(self.idx)))
}
GroupsProxy::Idx(groups) => unsafe {
let item = groups.get_unchecked(self.idx);
Some(GroupsIndicator::Idx(item))
},
GroupsProxy::Slice(groups) => {
Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
}
Expand Down

0 comments on commit e533df9

Please sign in to comment.