Skip to content

Commit

Permalink
wrap GroupsIdx with struct
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 28, 2022
1 parent 164749b commit 071f514
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
5 changes: 3 additions & 2 deletions polars/polars-core/src/chunked_array/object/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ mod test {
let values = &[Some(foo1), None, Some(foo2), None];
let ca = ObjectChunked::new("", values);

let groups = GroupsProxy::Idx(vec![(0u32, vec![0u32, 1]), (2, vec![2]), (3, vec![3])]);
let groups =
GroupsProxy::Idx(vec![(0u32, vec![0u32, 1]), (2, vec![2]), (3, vec![3])].into());
let out = ca.agg_list(&groups).unwrap();
assert!(matches!(out.dtype(), DataType::List(_)));
assert_eq!(out.len(), groups.len());
Expand All @@ -213,7 +214,7 @@ mod test {
let values = &[Some(foo1.clone()), None, Some(foo2.clone()), None];
let ca = ObjectChunked::new("", values);

let groups = vec![(0u32, vec![0u32, 1]), (2, vec![2]), (3, vec![3])];
let groups = vec![(0u32, vec![0u32, 1]), (2, vec![2]), (3, vec![3])].into();
let out = ca.agg_list(&GroupsProxy::Idx(groups)).unwrap();
let a = out.explode().unwrap();

Expand Down
85 changes: 81 additions & 4 deletions polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,89 @@
use crate::prelude::*;
use crate::utils::NoNull;
use polars_arrow::utils::CustomIterTools;
use polars_arrow::utils::{CustomIterTools, FromTrustedLenIterator};
use rayon::iter::plumbing::UnindexedConsumer;
use rayon::prelude::*;
use std::ops::{Deref, DerefMut};

/// Indexes of the groups, the first index is stored separately.
/// this make sorting fast.
pub type GroupsIdx = Vec<(u32, Vec<u32>)>;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct GroupsIdx(Vec<(u32, Vec<u32>)>);

pub type IdxItem = (u32, Vec<u32>);

impl From<Vec<IdxItem>> for GroupsIdx {
fn from(v: Vec<IdxItem>) -> Self {
GroupsIdx(v)
}
}

impl FromIterator<IdxItem> for GroupsIdx {
fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
GroupsIdx(iter.into_iter().collect())
}
}

impl<'a> IntoIterator for &'a GroupsIdx {
type Item = &'a IdxItem;
type IntoIter = std::slice::Iter<'a, IdxItem>;

fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}

impl<'a> IntoIterator for GroupsIdx {
type Item = IdxItem;
type IntoIter = std::vec::IntoIter<IdxItem>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

impl FromTrustedLenIterator<IdxItem> for GroupsIdx {
fn from_iter_trusted_length<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self
where
T::IntoIter: TrustedLen,
{
GroupsIdx(iter.into_iter().collect_trusted())
}
}

impl FromParallelIterator<IdxItem> for GroupsIdx {
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = IdxItem>,
{
let v = Vec::from_par_iter(par_iter);
GroupsIdx(v)
}
}

impl IntoParallelIterator for GroupsIdx {
type Iter = rayon::vec::IntoIter<IdxItem>;
type Item = IdxItem;

fn into_par_iter(self) -> Self::Iter {
self.0.into_par_iter()
}
}

impl Deref for GroupsIdx {
type Target = Vec<(u32, Vec<u32>)>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for GroupsIdx {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

/// Every group is indicated by an array where the
/// - first value is an index to the start of the group
/// - second value is the length of the group
Expand All @@ -21,7 +98,7 @@ pub enum GroupsProxy {

impl Default for GroupsProxy {
fn default() -> Self {
GroupsProxy::Idx(vec![])
GroupsProxy::Idx(GroupsIdx(vec![]))
}
}

Expand All @@ -32,7 +109,7 @@ impl GroupsProxy {
GroupsProxy::Idx(groups) => groups,
GroupsProxy::Slice(groups) => groups
.iter()
.map(|&[first, len]| (first, (first..first + len).collect()))
.map(|&[first, len]| (first, (first..first + len).collect_trusted::<Vec<_>>()))
.collect(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ mod test {
let s = s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))?;

let l = s
.agg_list(&GroupsProxy::Idx(vec![(0, vec![0, 1, 2])]))
.agg_list(&GroupsProxy::Idx(vec![(0, vec![0, 1, 2])].into()))
.unwrap();

match l.dtype() {
Expand Down
23 changes: 13 additions & 10 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl Wrap<&DataFrame> {
update_bounds(lower, upper);
update_subgroups(&sub_groups, base_g)
})
.collect::<Vec<_>>();
.collect();
GroupsProxy::Idx(groupsidx)
} else {
let groupsidx = POOL.install(|| {
Expand All @@ -280,7 +280,7 @@ impl Wrap<&DataFrame> {
);
update_subgroups(&sub_groups, base_g)
})
.collect::<Vec<_>>()
.collect()
});
GroupsProxy::Idx(groupsidx)
}
Expand Down Expand Up @@ -495,14 +495,17 @@ mod test {
.into_series();
assert_eq!(&upper, &range);

let expected = GroupsProxy::Idx(vec![
(0u32, vec![0u32, 1, 2]),
(2u32, vec![2]),
(5u32, vec![5, 6]),
(6u32, vec![6]),
(3u32, vec![3, 4]),
(4u32, vec![4]),
]);
let expected = GroupsProxy::Idx(
vec![
(0u32, vec![0u32, 1, 2]),
(2u32, vec![2]),
(5u32, vec![5, 6]),
(6u32, vec![6]),
(3u32, vec![3, 4]),
(4u32, vec![4]),
]
.into(),
);
assert_eq!(expected, groups);
}

Expand Down

0 comments on commit 071f514

Please sign in to comment.