Skip to content

Commit

Permalink
[lazy] all eager aggregations added; #99
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 4, 2020
1 parent 6a5d6a5 commit a180c93
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 39 deletions.
38 changes: 19 additions & 19 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ where
}
}

trait AggFirst {
fn agg_first(&self, _groups: &Vec<(usize, Vec<usize>)>) -> Series;
pub(crate) trait AggFirst {
fn agg_first(&self, _groups: &[(usize, Vec<usize>)]) -> Series;
}

macro_rules! impl_agg_first {
Expand All @@ -433,25 +433,25 @@ impl<T> AggFirst for ChunkedArray<T>
where
T: ArrowPrimitiveType + Send,
{
fn agg_first(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
fn agg_first(&self, groups: &[(usize, Vec<usize>)]) -> Series {
impl_agg_first!(self, groups, ChunkedArray<T>)
}
}

impl AggFirst for Utf8Chunked {
fn agg_first(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
fn agg_first(&self, groups: &[(usize, Vec<usize>)]) -> Series {
impl_agg_first!(self, groups, Utf8Chunked)
}
}

impl AggFirst for LargeListChunked {
fn agg_first(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
fn agg_first(&self, groups: &[(usize, Vec<usize>)]) -> Series {
impl_agg_first!(self, groups, LargeListChunked)
}
}

trait AggLast {
fn agg_last(&self, _groups: &Vec<(usize, Vec<usize>)>) -> Series;
pub(crate) trait AggLast {
fn agg_last(&self, _groups: &[(usize, Vec<usize>)]) -> Series;
}

macro_rules! impl_agg_last {
Expand All @@ -468,25 +468,25 @@ impl<T> AggLast for ChunkedArray<T>
where
T: ArrowPrimitiveType + Send,
{
fn agg_last(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
fn agg_last(&self, groups: &[(usize, Vec<usize>)]) -> Series {
impl_agg_last!(self, groups, ChunkedArray<T>)
}
}

impl AggLast for Utf8Chunked {
fn agg_last(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
fn agg_last(&self, groups: &[(usize, Vec<usize>)]) -> Series {
impl_agg_last!(self, groups, Utf8Chunked)
}
}

impl AggLast for LargeListChunked {
fn agg_last(&self, groups: &Vec<(usize, Vec<usize>)>) -> Series {
fn agg_last(&self, groups: &[(usize, Vec<usize>)]) -> Series {
impl_agg_last!(self, groups, LargeListChunked)
}
}

trait AggNUnique {
fn agg_n_unique(&self, _groups: &Vec<(usize, Vec<usize>)>) -> Option<UInt32Chunked> {
pub(crate) trait AggNUnique {
fn agg_n_unique(&self, _groups: &[(usize, Vec<usize>)]) -> Option<UInt32Chunked> {
None
}
}
Expand Down Expand Up @@ -522,7 +522,7 @@ where
T: PolarsIntegerType + Sync,
T::Native: Hash + Eq,
{
fn agg_n_unique(&self, groups: &Vec<(usize, Vec<usize>)>) -> Option<UInt32Chunked> {
fn agg_n_unique(&self, groups: &[(usize, Vec<usize>)]) -> Option<UInt32Chunked> {
Some(impl_agg_n_unique!(self, groups, Xob<UInt32Chunked>))
}
}
Expand All @@ -533,23 +533,23 @@ impl AggNUnique for LargeListChunked {}

// TODO: could be faster as it can only be null, true, or false
impl AggNUnique for BooleanChunked {
fn agg_n_unique(&self, groups: &Vec<(usize, Vec<usize>)>) -> Option<UInt32Chunked> {
fn agg_n_unique(&self, groups: &[(usize, Vec<usize>)]) -> Option<UInt32Chunked> {
Some(impl_agg_n_unique!(self, groups, Xob<UInt32Chunked>))
}
}

impl AggNUnique for Utf8Chunked {
fn agg_n_unique(&self, groups: &Vec<(usize, Vec<usize>)>) -> Option<UInt32Chunked> {
fn agg_n_unique(&self, groups: &[(usize, Vec<usize>)]) -> Option<UInt32Chunked> {
Some(impl_agg_n_unique!(self, groups, Xob<UInt32Chunked>))
}
}

trait AggQuantile {
fn agg_quantile(&self, _groups: &Vec<(usize, Vec<usize>)>, _quantile: f64) -> Option<Series> {
pub(crate) trait AggQuantile {
fn agg_quantile(&self, _groups: &[(usize, Vec<usize>)], _quantile: f64) -> Option<Series> {
None
}

fn agg_median(&self, groups: &Vec<(usize, Vec<usize>)>) -> Option<Series> {
fn agg_median(&self, groups: &[(usize, Vec<usize>)]) -> Option<Series> {
self.agg_quantile(groups, 0.5)
}
}
Expand All @@ -559,7 +559,7 @@ where
T: PolarsNumericType + Sync,
T::Native: PartialEq,
{
fn agg_quantile(&self, groups: &Vec<(usize, Vec<usize>)>, quantile: f64) -> Option<Series> {
fn agg_quantile(&self, groups: &[(usize, Vec<usize>)], quantile: f64) -> Option<Series> {
Some(
groups
.into_par_iter()
Expand Down
69 changes: 66 additions & 3 deletions polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ impl Expr {
IsNotNull(_) => Ok(ArrowDataType::Boolean),
Sort { expr, .. } => expr.get_type(schema),
AggMin(expr) => expr.get_type(schema),
AggMax(expr) => expr.get_type(schema),
AggSum(expr) => expr.get_type(schema),
AggFirst(expr) => expr.get_type(schema),
AggLast(expr) => expr.get_type(schema),
AggMean(expr) => expr.get_type(schema),
AggMedian(expr) => expr.get_type(schema),
AggGroups(_) => Ok(ArrowDataType::UInt32),
AggNUnique(_) => Ok(ArrowDataType::UInt32),
AggQuantile { expr, .. } => expr.get_type(schema),
}
}

Expand Down Expand Up @@ -134,7 +143,8 @@ impl Expr {
AggNUnique(expr) => {
let field = expr.to_field(schema)?;
let new_name = fmt_groupby_column(field.name(), GroupByMethod::NUnique);
Ok(rename_field(&field, &new_name))
let new_field = Field::new(&new_name, ArrowDataType::UInt32, field.is_nullable());
Ok(new_field)
}
AggSum(expr) => {
let field = expr.to_field(schema)?;
Expand All @@ -144,7 +154,8 @@ impl Expr {
AggGroups(expr) => {
let field = expr.to_field(schema)?;
let new_name = fmt_groupby_column(field.name(), GroupByMethod::Groups);
Ok(rename_field(&field, &new_name))
let new_field = Field::new(&new_name, ArrowDataType::UInt32, field.is_nullable());
Ok(new_field)
}
AggQuantile { expr, quantile } => {
let field = expr.to_field(schema)?;
Expand All @@ -171,6 +182,15 @@ impl fmt::Debug for Expr {
false => write!(f, "{:?} ASC", expr),
},
AggMin(expr) => write!(f, "AGGREGATE MIN {:?}", expr),
AggMax(expr) => write!(f, "AGGREGATE MAX {:?}", expr),
AggMedian(expr) => write!(f, "AGGREGATE MEDIAN {:?}", expr),
AggMean(expr) => write!(f, "AGGREGATE MEAN {:?}", expr),
AggFirst(expr) => write!(f, "AGGREGATE FIRST {:?}", expr),
AggLast(expr) => write!(f, "AGGREGATE LAST {:?}", expr),
AggNUnique(expr) => write!(f, "AGGREGATE N UNIQUE {:?}", expr),
AggSum(expr) => write!(f, "AGGREGATE SUM {:?}", expr),
AggGroups(expr) => write!(f, "AGGREGATE GROUPS {:?}", expr),
AggQuantile { expr, .. } => write!(f, "AGGREGATE QUANTILE {:?}", expr),
}
}
}
Expand Down Expand Up @@ -234,10 +254,53 @@ impl Expr {
Expr::IsNotNull(Box::new(self))
}

/// Reduce column to minimal value.
/// Reduce groups to minimal value.
pub fn agg_min(self) -> Self {
Expr::AggMin(Box::new(self))
}

/// Reduce groups to maximum value.
pub fn agg_max(self) -> Self {
Expr::AggMax(Box::new(self))
}

/// Reduce groups to the mean value.
pub fn agg_mean(self) -> Self {
Expr::AggMean(Box::new(self))
}

/// Reduce groups to the median value.
pub fn agg_median(self) -> Self {
Expr::AggMedian(Box::new(self))
}

/// Reduce groups to the sum of all the values.
pub fn agg_sum(self) -> Self {
Expr::AggSum(Box::new(self))
}

/// Get the number of unique values in the groups.
pub fn agg_n_unique(self) -> Self {
Expr::AggNUnique(Box::new(self))
}

/// Get the first value in the group.
pub fn agg_first(self) -> Self {
Expr::AggFirst(Box::new(self))
}

/// Get the last value in the group.
pub fn agg_last(self) -> Self {
Expr::AggLast(Box::new(self))
}

/// Compute the quantile per group.
pub fn agg_quantile(self, quantile: f64) -> Self {
Expr::AggQuantile {
expr: Box::new(self),
quantile,
}
}
}

/// Create a Colum Expression based on a column name.
Expand Down

0 comments on commit a180c93

Please sign in to comment.