Skip to content

Commit

Permalink
Restructure lazy expressions (#590)
Browse files Browse the repository at this point in the history
All physical expressions are in a separate file.
  • Loading branch information
ritchie46 committed Apr 29, 2021
1 parent 3ff3f7b commit 6e1f0b4
Show file tree
Hide file tree
Showing 20 changed files with 1,052 additions and 958 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
//! Implementations of PhysicalAggregation. These aggregations are called by the groupby context,
//! and nowhere else. Note, that this differes from evaluate on groups, which is also called in that
//! context, but typically before aggregation

use crate::physical_plan::state::ExecutionState;
use crate::physical_plan::PhysicalAggregation;
use crate::prelude::*;
Expand All @@ -10,81 +6,32 @@ use polars_core::chunked_array::builder::get_list_builder;
use polars_core::frame::groupby::{fmt_groupby_column, GroupByMethod, GroupTuples};
use polars_core::prelude::*;
use polars_core::utils::NoNull;
use std::sync::Arc;

impl PhysicalAggregation for AliasExpr {
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let agg_expr = self.physical_expr.as_agg_expr()?;
let opt_agg = agg_expr.aggregate(df, groups, state)?;
Ok(opt_agg.map(|mut agg| {
agg.rename(&self.name);
agg
}))
}
pub(crate) struct AggregationExpr {
pub(crate) expr: Arc<dyn PhysicalExpr>,
pub(crate) agg_type: GroupByMethod,
}

impl PhysicalAggregation for SortExpr {
// As a final aggregation a Sort returns a list array.
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let s = self.physical_expr.evaluate(df, state)?;
let agg_s = s.agg_list(groups);
let out = agg_s.map(|s| {
s.list()
.unwrap()
.into_iter()
.map(|opt_s| opt_s.map(|s| s.sort(self.reverse)))
.collect::<ListChunked>()
.into_series()
});
Ok(out)
impl AggregationExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, agg_type: GroupByMethod) -> Self {
Self { expr, agg_type }
}
}

impl PhysicalAggregation for SortByExpr {
// As a final aggregation a Sort returns a list array.
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let s = self.input.evaluate(df, state)?;
let s_sort_by = self.by.evaluate(df, state)?;
impl PhysicalExpr for AggregationExpr {
fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
unimplemented!()
}

let s_sort_by = s_sort_by.agg_list(groups).ok_or_else(|| {
PolarsError::Other(format!("cannot aggregate {:?} as list array", self.expr).into())
})?;
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
let field = self.expr.to_field(input_schema)?;
let new_name = fmt_groupby_column(field.name(), self.agg_type);
Ok(Field::new(&new_name, field.data_type().clone()))
}

let agg_s = s.agg_list(groups);
let out = agg_s.map(|s| {
s.list()
.unwrap()
.into_iter()
.zip(s_sort_by.list().unwrap())
.map(|(opt_s, opt_sort_by)| {
match (opt_s, opt_sort_by) {
(Some(s), Some(sort_by)) => {
let sorted_idx = sort_by.argsort(self.reverse);
// Safety:
// sorted index are within bounds
unsafe { s.take_unchecked(&sorted_idx) }.ok()
}
_ => None,
}
})
.collect::<ListChunked>()
.into_series()
});
Ok(out)
fn as_agg_expr(&self) -> Result<&dyn PhysicalAggregation> {
Ok(self)
}
}

Expand Down Expand Up @@ -325,116 +272,30 @@ impl PhysicalAggregation for ApplyExpr {
}
}
}
impl PhysicalAggregation for SliceExpr {
// As a final aggregation a Slice returns a list array.
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let s = self.input.evaluate(df, state)?;
let agg_s = s.agg_list(groups);
let out = agg_s.map(|s| {
s.list()
.unwrap()
.into_iter()
.map(|opt_s| opt_s.map(|s| s.slice(self.offset, self.len)))
.collect::<ListChunked>()
.into_series()
});
Ok(out)
}
}

impl PhysicalAggregation for BinaryFunctionExpr {
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let a = self.input_a.evaluate(df, state)?;
let b = self.input_b.evaluate(df, state)?;

let agg_a = a.agg_list(groups).expect("no data?");
let agg_b = b.agg_list(groups).expect("no data?");

// keep track of the output lengths. If they are all unit length,
// we can explode the array as it would have the same length as the no. of groups
// if it is not all unit length it should remain a listarray

let mut all_unit_length = true;

let ca = agg_a
.list()
.unwrap()
.into_iter()
.zip(agg_b.list().unwrap())
.map(|(opt_a, opt_b)| match (opt_a, opt_b) {
(Some(a), Some(b)) => {
let out = self.function.call_udf(a, b).ok();

if let Some(s) = &out {
if s.len() != 1 {
all_unit_length = false;
}
}
out
}
_ => None,
})
.collect::<ListChunked>();
pub struct AggQuantileExpr {
pub(crate) expr: Arc<dyn PhysicalExpr>,
pub(crate) quantile: f64,
}

if all_unit_length {
return Ok(Some(ca.explode()?));
}
Ok(Some(ca.into_series()))
impl AggQuantileExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, quantile: f64) -> Self {
Self { expr, quantile }
}
}

impl PhysicalAggregation for BinaryExpr {
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
match (self.left.as_agg_expr(), self.right.as_agg_expr()) {
(Ok(left), Err(_)) => {
let opt_agg = left.aggregate(df, groups, state)?;
let rhs = self.right.evaluate(df, state)?;
opt_agg
.map(|agg| apply_operator(&agg, &rhs, self.op))
.transpose()
}
(Err(_), Ok(right)) => {
let opt_agg = right.aggregate(df, groups, state)?;
let lhs = self.left.evaluate(df, state)?;
opt_agg
.map(|agg| apply_operator(&lhs, &agg, self.op))
.transpose()
}
(Ok(left), Ok(right)) => {
let right_agg = right.aggregate(df, groups, state)?;
left.aggregate(df, groups, state)?
.and_then(|left| right_agg.map(|right| apply_operator(&left, &right, self.op)))
.transpose()
}
(_, _) => Err(PolarsError::Other(
"both expressions could not be used in an aggregation context.".into(),
)),
}
impl PhysicalExpr for AggQuantileExpr {
fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
unimplemented!()
}
}

impl PhysicalAggregation for LiteralExpr {
fn aggregate(
&self,
df: &DataFrame,
_groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
PhysicalExpr::evaluate(self, df, state).map(Some)
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
let field = self.expr.to_field(input_schema)?;
let new_name = fmt_groupby_column(field.name(), GroupByMethod::Quantile(self.quantile));
Ok(Field::new(&new_name, field.data_type().clone()))
}

fn as_agg_expr(&self) -> Result<&dyn PhysicalAggregation> {
Ok(self)
}
}
63 changes: 63 additions & 0 deletions polars/polars-lazy/src/physical_plan/expressions/alias.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::frame::groupby::GroupTuples;
use polars_core::prelude::*;
use std::sync::Arc;

pub struct AliasExpr {
pub(crate) physical_expr: Arc<dyn PhysicalExpr>,
pub(crate) name: Arc<String>,
expr: Expr,
}

impl AliasExpr {
pub fn new(physical_expr: Arc<dyn PhysicalExpr>, name: Arc<String>, expr: Expr) -> Self {
Self {
physical_expr,
name,
expr,
}
}
}

impl PhysicalExpr for AliasExpr {
fn as_expression(&self) -> &Expr {
&self.expr
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let mut series = self.physical_expr.evaluate(df, state)?;
series.rename(&self.name);
Ok(series)
}

fn to_field(&self, input_schema: &Schema) -> Result<Field> {
Ok(Field::new(
&self.name,
self.physical_expr
.to_field(input_schema)?
.data_type()
.clone(),
))
}

fn as_agg_expr(&self) -> Result<&dyn PhysicalAggregation> {
Ok(self)
}
}

impl PhysicalAggregation for AliasExpr {
fn aggregate(
&self,
df: &DataFrame,
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let agg_expr = self.physical_expr.as_agg_expr()?;
let opt_agg = agg_expr.aggregate(df, groups, state)?;
Ok(opt_agg.map(|mut agg| {
agg.rename(&self.name);
agg
}))
}
}
56 changes: 56 additions & 0 deletions polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::physical_plan::state::ExecutionState;
use crate::physical_plan::PhysicalAggregation;
use crate::prelude::*;
use polars_core::prelude::*;
use std::sync::Arc;

pub struct ApplyExpr {
pub input: Arc<dyn PhysicalExpr>,
pub function: NoEq<Arc<dyn SeriesUdf>>,
pub output_type: Option<DataType>,
pub expr: Expr,
}

impl ApplyExpr {
pub fn new(
input: Arc<dyn PhysicalExpr>,
function: NoEq<Arc<dyn SeriesUdf>>,
output_type: Option<DataType>,
expr: Expr,
) -> Self {
ApplyExpr {
input,
function,
output_type,
expr,
}
}
}

impl PhysicalExpr for ApplyExpr {
fn as_expression(&self) -> &Expr {
&self.expr
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let input = self.input.evaluate(df, state)?;
let in_name = input.name().to_string();
let mut out = self.function.call_udf(input)?;
if in_name != out.name() {
out.rename(&in_name);
}
Ok(out)
}
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
match &self.output_type {
Some(output_type) => {
let input_field = self.input.to_field(input_schema)?;
Ok(Field::new(input_field.name(), output_type.clone()))
}
None => self.input.to_field(input_schema),
}
}
fn as_agg_expr(&self) -> Result<&dyn PhysicalAggregation> {
Ok(self)
}
}

0 comments on commit 6e1f0b4

Please sign in to comment.