Skip to content

Commit

Permalink
first operation on groupby
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 4, 2020
1 parent 8d70527 commit dfe67f5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 42 deletions.
35 changes: 35 additions & 0 deletions polars/src/chunked_array/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,41 @@ use std::cmp::Ordering;
use std::marker::Sized;
use std::ops::{Add, Div};

/// Fast access by index.
pub trait ChunkTake {
/// Take values from ChunkedArray by index.
fn take(&self, indices: impl Iterator<Item = usize>, capacity: Option<usize>) -> Result<Self>
where
Self: std::marker::Sized;

/// Take values from ChunkedArray by index without checking bounds.
unsafe fn take_unchecked(
&self,
indices: impl Iterator<Item = usize>,
capacity: Option<usize>,
) -> Self
where
Self: std::marker::Sized;

/// Take values from ChunkedArray by Option<index>.
fn take_opt(
&self,
indices: impl Iterator<Item = Option<usize>>,
capacity: Option<usize>,
) -> Result<Self>
where
Self: std::marker::Sized;

/// Take values from ChunkedArray by Option<index>.
unsafe fn take_opt_unchecked(
&self,
indices: impl Iterator<Item = Option<usize>>,
capacity: Option<usize>,
) -> Self
where
Self: std::marker::Sized;
}

/// Create a `ChunkedArray` with new values by index or by boolean mask.
/// Note that these operations clone data. This is however the only way we can modify at mask or
/// index level as the underlying Arrow arrays are immutable.
Expand Down
42 changes: 4 additions & 38 deletions polars/src/chunked_array/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,6 @@ use crate::chunked_array::builder::{
use crate::prelude::*;
use arrow::array::{Array, BooleanArray, LargeListArray, PrimitiveArray, StringArray};

pub trait Take {
/// Take values from ChunkedArray by index.
fn take(&self, indices: impl Iterator<Item = usize>, capacity: Option<usize>) -> Result<Self>
where
Self: std::marker::Sized;

/// Take values from ChunkedArray by index without checking bounds.
unsafe fn take_unchecked(
&self,
indices: impl Iterator<Item = usize>,
capacity: Option<usize>,
) -> Self
where
Self: std::marker::Sized;

/// Take values from ChunkedArray by Option<index>.
fn take_opt(
&self,
indices: impl Iterator<Item = Option<usize>>,
capacity: Option<usize>,
) -> Result<Self>
where
Self: std::marker::Sized;

/// Take values from ChunkedArray by Option<index>.
unsafe fn take_opt_unchecked(
&self,
indices: impl Iterator<Item = Option<usize>>,
capacity: Option<usize>,
) -> Self
where
Self: std::marker::Sized;
}

macro_rules! impl_take {
($self:ident, $indices:ident, $capacity:ident, $builder:ident) => {{
let capacity = $capacity.unwrap_or($indices.size_hint().0);
Expand Down Expand Up @@ -111,7 +77,7 @@ macro_rules! impl_take_unchecked {
}};
}

impl<T> Take for ChunkedArray<T>
impl<T> ChunkTake for ChunkedArray<T>
where
T: PolarsNumericType,
{
Expand Down Expand Up @@ -144,7 +110,7 @@ where
}
}

impl Take for BooleanChunked {
impl ChunkTake for BooleanChunked {
fn take(&self, indices: impl Iterator<Item = usize>, capacity: Option<usize>) -> Result<Self>
where
Self: std::marker::Sized,
Expand Down Expand Up @@ -177,7 +143,7 @@ impl Take for BooleanChunked {
}
}

impl Take for Utf8Chunked {
impl ChunkTake for Utf8Chunked {
fn take(&self, indices: impl Iterator<Item = usize>, capacity: Option<usize>) -> Result<Self>
where
Self: std::marker::Sized,
Expand Down Expand Up @@ -213,7 +179,7 @@ impl Take for Utf8Chunked {
}
}

impl Take for LargeListChunked {
impl ChunkTake for LargeListChunked {
fn take(&self, indices: impl Iterator<Item = usize>, capacity: Option<usize>) -> Result<Self> {
let capacity = capacity.unwrap_or(indices.size_hint().0);

Expand Down
1 change: 1 addition & 0 deletions polars/src/doc/changelog/v0_5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
//! * `ChunkSet` Trait.
//! * `Groupby` aggregations can be done on a selection of multiple columns.
//! * `Groupby` operation can be done on multiple keys.
//! * `Groupby` `first` operation.
//!
90 changes: 90 additions & 0 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,56 @@ where
}
}

#[enum_dispatch(Series)]
trait AggFirst {
fn agg_first(&self, _groups: &Vec<(usize, Vec<usize>)>) -> Series {
unimplemented!()
}
}

macro_rules! impl_agg_first {
($self:ident, $groups:ident, $ca_type:ty) => {{
$groups
.par_iter()
.map(|(first, _idx)| {
let taker = $self.take_rand();
taker.get(*first)
})
.collect::<$ca_type>()
.into_series()
}};
}

impl<T> AggFirst for ChunkedArray<T>
where
T: PolarsNumericType + std::marker::Sync,
{
fn agg_first(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
impl_agg_first!(self, groups, ChunkedArray<T>)
}
}

impl AggFirst for BooleanChunked {
fn agg_first(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
impl_agg_first!(self, groups, BooleanChunked)
}
}

impl AggFirst for Utf8Chunked {
fn agg_first(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
groups
.par_iter()
.map(|(first, _idx)| {
let taker = self.take_rand();
taker.get(*first).map(|s| s.to_string())
})
.collect::<Utf8Chunked>()
.into_series()
}
}

impl AggFirst for LargeListChunked {}

impl<'a, 'b> GroupBy<'a, 'b> {
/// Select the column by which the determine the groups.
/// You can select a single column or a slice of columns.
Expand Down Expand Up @@ -568,6 +618,42 @@ impl<'a, 'b> GroupBy<'a, 'b> {
DataFrame::new(cols)
}

/// Aggregate grouped series and find the first value per group.
///
/// # Example
///
/// ```rust
/// # use polars::prelude::*;
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// df.groupby("date")?.select("temp").first()
/// }
/// ```
/// Returns:
///
/// ```text
/// +------------+------------+
/// | date | temp_first |
/// | --- | --- |
/// | date32 | i32 |
/// +============+============+
/// | 2020-08-23 | 9 |
/// +------------+------------+
/// | 2020-08-22 | 7 |
/// +------------+------------+
/// | 2020-08-21 | 20 |
/// +------------+------------+
/// ```
pub fn first(&self) -> Result<DataFrame> {
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_first", agg_col.name()];
let mut agg = agg_col.agg_first(&self.groups);
agg.rename(&new_name);
cols.push(agg);
}
DataFrame::new(cols)
}

/// Aggregate grouped series and compute the number of values per group.
///
/// # Example
Expand Down Expand Up @@ -745,5 +831,9 @@ mod test {
.agg_list()
.unwrap()
);
println!(
"{:?}",
df.groupby("date").unwrap().select("temp").first().unwrap()
);
}
}
7 changes: 3 additions & 4 deletions polars/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ pub use crate::{
iterator::{IntoNoNullIterator, NumericChunkIterDispatch},
ops::{
ChunkAgg, ChunkApply, ChunkCast, ChunkCompare, ChunkFillNone, ChunkFilter, ChunkFull,
ChunkReverse, ChunkSet, ChunkShift, ChunkSort, ChunkUnique, FillNoneStrategy,
},
take::{
AsTakeIndex, IntoTakeRandom, NumTakeRandomChunked, NumTakeRandomCont, Take, TakeRandom,
ChunkReverse, ChunkSet, ChunkShift, ChunkSort, ChunkTake, ChunkUnique,
FillNoneStrategy,
},
take::{AsTakeIndex, IntoTakeRandom, NumTakeRandomChunked, NumTakeRandomCont, TakeRandom},
ChunkedArray, Downcast,
},
datatypes,
Expand Down

0 comments on commit dfe67f5

Please sign in to comment.