Skip to content

Commit

Permalink
ternary expr: validate predicate in groupby context (#4237)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 3, 2022
1 parent ac916d2 commit c4fc26c
Show file tree
Hide file tree
Showing 30 changed files with 184 additions and 189 deletions.
8 changes: 7 additions & 1 deletion polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::prelude::*;
use std::borrow::Cow;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fmt::{Debug, Display, Formatter};

impl fmt::Debug for LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -179,6 +179,12 @@ FROM
}
}

impl Display for Expr {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(self, f)
}
}

impl fmt::Debug for Expr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Expr::*;
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn execute_projection_cached_window_fns(
let mut index = 0u32;
exprs.iter().for_each(|phys| {
index += 1;
let e = phys.as_expression();
let e = phys.as_expression().unwrap();

let mut is_window = false;
for e in e.into_iter() {
Expand Down Expand Up @@ -119,6 +119,7 @@ fn execute_projection_cached_window_fns(

for (index, _, e) in partition.1 {
if e.as_expression()
.unwrap()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ impl Executor for CsvExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
predicate: self.predicate.as_ref().map(|ae| ae.as_expression().clone()),
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (self.options.skip_rows, self.options.n_rows),
};
state
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ impl Executor for IpcExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
predicate: self.predicate.as_ref().map(|ae| ae.as_expression().clone()),
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (0, self.options.n_rows),
};
state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ impl Executor for ParquetExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
predicate: self.predicate.as_ref().map(|ae| ae.as_expression().clone()),
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (0, self.options.n_rows),
};
state
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Executor for SortExec {
// this should only be done with simple col("foo") expressions
// therefore we rename more complex expressions so that
// polars core does not match these
if !matches!(e.as_expression(), Expr::Column(_)) {
if !matches!(e.as_expression(), Some(&Expr::Column(_))) {
s.rename(&format!("_POLARS_SORT_BY_{}", i));
}
Ok(s)
Expand Down
29 changes: 20 additions & 9 deletions polars/polars-lazy/src/physical_plan/expressions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@ use std::borrow::Cow;
use std::sync::Arc;

pub(crate) struct AggregationExpr {
pub(crate) expr: Arc<dyn PhysicalExpr>,
pub(crate) input: Arc<dyn PhysicalExpr>,
pub(crate) agg_type: GroupByMethod,
}

impl AggregationExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, agg_type: GroupByMethod) -> Self {
Self { expr, agg_type }
Self {
input: expr,
agg_type,
}
}
}

impl PhysicalExpr for AggregationExpr {
fn as_expression(&self) -> &Expr {
unimplemented!()
fn as_expression(&self) -> Option<&Expr> {
None
}

fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
Expand All @@ -36,7 +39,7 @@ impl PhysicalExpr for AggregationExpr {
groups: &'a GroupsProxy,
state: &ExecutionState,
) -> Result<AggregationContext<'a>> {
let mut ac = self.expr.evaluate_on_groups(df, groups, state)?;
let mut ac = self.input.evaluate_on_groups(df, groups, state)?;
// don't change names by aggregations as is done in polars-core
let keep_name = ac.series().name().to_string();

Expand Down Expand Up @@ -164,12 +167,16 @@ impl PhysicalExpr for AggregationExpr {
}

fn to_field(&self, input_schema: &Schema) -> Result<Field> {
self.expr.to_field(input_schema)
self.input.to_field(input_schema)
}

fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
Some(self)
}

fn is_valid_aggregation(&self) -> bool {
true
}
}

fn rename_series(mut s: Series, name: &str) -> Series {
Expand All @@ -184,7 +191,7 @@ impl PartitionedAggregation for AggregationExpr {
groups: &GroupsProxy,
state: &ExecutionState,
) -> Result<Series> {
let expr = self.expr.as_partitioned_aggregator().unwrap();
let expr = self.input.as_partitioned_aggregator().unwrap();
let series = expr.evaluate_partitioned(df, groups, state)?;

// Safety:
Expand Down Expand Up @@ -398,8 +405,8 @@ impl AggQuantileExpr {
}

impl PhysicalExpr for AggQuantileExpr {
fn as_expression(&self) -> &Expr {
unimplemented!()
fn as_expression(&self) -> Option<&Expr> {
None
}

fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
Expand Down Expand Up @@ -430,4 +437,8 @@ impl PhysicalExpr for AggQuantileExpr {
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
self.expr.to_field(input_schema)
}

fn is_valid_aggregation(&self) -> bool {
true
}
}
7 changes: 5 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ impl AliasExpr {
}

impl PhysicalExpr for AliasExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
Expand Down Expand Up @@ -66,6 +66,9 @@ impl PhysicalExpr for AliasExpr {
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
Some(self)
}
fn is_valid_aggregation(&self) -> bool {
self.physical_expr.is_valid_aggregation()
}
}

impl PartitionedAggregation for AliasExpr {
Expand Down
7 changes: 5 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ fn check_map_output_len(input_len: usize, output_len: usize) -> Result<()> {
}

impl PhysicalExpr for ApplyExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
Expand Down Expand Up @@ -230,4 +230,7 @@ impl PhysicalExpr for ApplyExpr {
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
self.inputs[0].to_field(input_schema)
}
fn is_valid_aggregation(&self) -> bool {
matches!(self.collect_groups, ApplyOptions::ApplyGroups)
}
}
14 changes: 12 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ pub fn apply_operator(left: &Series, right: &Series, op: Operator) -> Result<Ser
}

impl PhysicalExpr for BinaryExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
Expand Down Expand Up @@ -390,6 +390,16 @@ impl PhysicalExpr for BinaryExpr {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
Some(self)
}

fn is_valid_aggregation(&self) -> bool {
// we don't want:
// col(a) == lit(1)

// we do want
// col(a).sum() == lit(1)
(!self.left.is_literal() && self.left.is_valid_aggregation())
| (!self.right.is_literal() && self.right.is_valid_aggregation())
}
}

#[cfg(feature = "parquet")]
Expand Down
8 changes: 6 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl CastExpr {
}

impl PhysicalExpr for CastExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
Expand Down Expand Up @@ -110,6 +110,10 @@ impl PhysicalExpr for CastExpr {
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
Some(self)
}

fn is_valid_aggregation(&self) -> bool {
self.input.is_valid_aggregation()
}
}

impl PartitionedAggregation for CastExpr {
Expand Down
7 changes: 5 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ impl ColumnExpr {
}

impl PhysicalExpr for ColumnExpr {
fn as_expression(&self) -> &Expr {
&self.1
fn as_expression(&self) -> Option<&Expr> {
Some(&self.1)
}
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
match state.get_schema() {
Expand Down Expand Up @@ -102,6 +102,9 @@ impl PhysicalExpr for ColumnExpr {
})?;
Ok(field)
}
fn is_valid_aggregation(&self) -> bool {
false
}
}

impl PartitionedAggregation for ColumnExpr {
Expand Down
9 changes: 6 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ impl CountExpr {
}

impl PhysicalExpr for CountExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
Ok(Series::new("count", [df.height() as IdxSize]))
}
Expand All @@ -43,6 +42,10 @@ impl PhysicalExpr for CountExpr {
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
Some(self)
}

fn is_valid_aggregation(&self) -> bool {
true
}
}

impl PartitionedAggregation for CountExpr {
Expand Down
9 changes: 6 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ impl FilterExpr {
}

impl PhysicalExpr for FilterExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let s_f = || self.input.evaluate(df, state);
let predicate_f = || self.by.evaluate(df, state);
Expand Down Expand Up @@ -104,4 +103,8 @@ impl PhysicalExpr for FilterExpr {
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
self.input.to_field(input_schema)
}

fn is_valid_aggregation(&self) -> bool {
self.input.is_valid_aggregation()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ impl<'a> AggregationContext<'a> {
) -> Box<dyn Iterator<Item = Option<UnstableSeries<'_>>> + '_> {
match self.agg_state() {
AggState::Literal(_) => {
self.groups();
let s = self.series();
Box::new(LitIter::new(s.array_ref(0).clone(), self.groups.len()))
}
AggState::AggregatedFlat(_) => {
self.groups();
let s = self.series();
Box::new(FlatIter::new(s.array_ref(0).clone(), self.groups.len()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ impl IsNotNullExpr {
}

impl PhysicalExpr for IsNotNullExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let series = self.physical_expr.evaluate(df, state)?;
Ok(series.is_not_null().into_series())
Expand All @@ -48,6 +47,10 @@ impl PhysicalExpr for IsNotNullExpr {
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
Some(self)
}

fn is_valid_aggregation(&self) -> bool {
self.physical_expr.is_valid_aggregation()
}
}

impl PartitionedAggregation for IsNotNullExpr {
Expand Down
8 changes: 5 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ impl IsNullExpr {
}

impl PhysicalExpr for IsNullExpr {
fn as_expression(&self) -> &Expr {
&self.expr
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let series = self.physical_expr.evaluate(df, state)?;
Ok(series.is_null().into_series())
Expand Down Expand Up @@ -57,6 +56,9 @@ impl PhysicalExpr for IsNullExpr {
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
Some(self)
}
fn is_valid_aggregation(&self) -> bool {
self.physical_expr.is_valid_aggregation()
}
}

#[cfg(feature = "parquet")]
Expand Down

0 comments on commit c4fc26c

Please sign in to comment.