Skip to content

Commit

Permalink
[lazy] type coercion optimization; closes #96
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 6, 2020
1 parent 190ff66 commit 1f19fbf
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 17 deletions.
17 changes: 15 additions & 2 deletions polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum Expr {

impl Expr {
/// Get DataType result of the expression. The schema is the input data.
fn get_type(&self, schema: &Schema) -> Result<ArrowDataType> {
pub fn get_type(&self, schema: &Schema) -> Result<ArrowDataType> {
use Expr::*;
match self {
Alias(expr, ..) => expr.get_type(schema),
Expand Down Expand Up @@ -207,7 +207,7 @@ impl fmt::Debug for Expr {
}
}

fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
pub(crate) fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(l),
op,
Expand Down Expand Up @@ -314,13 +314,26 @@ impl Expr {
}
}

/// Get the group indexes of the group by operation.
pub fn agg_groups(self) -> Self {
Expr::AggGroups(Box::new(self))
}

/// Cast expression to another data type.
pub fn cast(self, data_type: ArrowDataType) -> Self {
Expr::Cast {
expr: Box::new(self),
data_type,
}
}

/// Sort expression.
pub fn sort(self, reverse: bool) -> Self {
Expr::Sort {
expr: Box::new(self),
reverse,
}
}
}

/// Create a Colum Expression based on a column name.
Expand Down
41 changes: 27 additions & 14 deletions polars/src/lazy/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct LazyFrame {
pub(crate) logical_plan: LogicalPlan,
projection_pushdown: bool,
predicate_pushdown: bool,
type_coercion: bool,
}

impl From<LogicalPlan> for LazyFrame {
Expand All @@ -26,6 +27,7 @@ impl From<LogicalPlan> for LazyFrame {
logical_plan: plan,
projection_pushdown: true,
predicate_pushdown: true,
type_coercion: true,
}
}
}
Expand All @@ -35,18 +37,24 @@ impl LazyFrame {
LogicalPlanBuilder::from(self.logical_plan)
}

/// Toggle projection pushdown optimizaton on or off.
/// Toggle projection pushdown optimization on or off.
pub fn with_projection_pushdown_optimization(mut self, toggle: bool) -> Self {
self.projection_pushdown = toggle;
self
}

/// Toggle predicate pushdown optimizaton on or off.
/// Toggle predicate pushdown optimization on or off.
pub fn with_predicate_pushdown_optimization(mut self, toggle: bool) -> Self {
self.predicate_pushdown = toggle;
self
}

/// Toggle type coercion optimization on or off.
pub fn with_type_coercion_optimization(mut self, toggle: bool) -> Self {
self.type_coercion = toggle;
self
}

/// Describe the logical plan.
pub fn describe_plan(&self) -> String {
self.logical_plan.describe()
Expand Down Expand Up @@ -102,35 +110,41 @@ impl LazyFrame {
pub fn collect(self) -> Result<DataFrame> {
let predicate_pushdown = self.predicate_pushdown;
let projection_pushdown = self.projection_pushdown;
let type_coercion = self.type_coercion;
let mut logical_plan = self.get_plan_builder().build();

let predicate_pushdown_opt = PredicatePushDown {};
let projection_pushdown_opt = ProjectionPushDown {};
let type_coercion_opt = TypeCoercion {};

let logical_plan = if cfg!(debug_assertions) {
if cfg!(debug_assertions) {
// check that the optimization don't interfere with the schema result.
let prev_schema = logical_plan.schema().clone();
if predicate_pushdown {
logical_plan = predicate_pushdown_opt.optimize(logical_plan);
}
assert_eq!(logical_plan.schema(), &prev_schema);
let prev_schema = logical_plan.schema().clone();
if projection_pushdown {
logical_plan = projection_pushdown_opt.optimize(logical_plan)
}
assert_eq!(logical_plan.schema(), &prev_schema);
logical_plan
} else {
// NOTE: the order is important. Projection pushdown must be later than predicate pushdown,
// because I want the projections to occur before the filtering.

let prev_schema = logical_plan.schema().clone();
if predicate_pushdown {
logical_plan = predicate_pushdown_opt.optimize(logical_plan);
}
assert_eq!(logical_plan.schema(), &prev_schema);
} else {
// NOTE: the order is important. Projection pushdown must be before predicate pushdown,
// The projection may have aliases that interfere with the predicate expressions.
if projection_pushdown {
logical_plan = projection_pushdown_opt.optimize(logical_plan)
}
logical_plan
if predicate_pushdown {
logical_plan = predicate_pushdown_opt.optimize(logical_plan);
}
};

if type_coercion {
logical_plan = type_coercion_opt.optimize(logical_plan);
}

let planner = DefaultPlanner::default();
let physical_plan = planner.create_physical_plan(&logical_plan)?;
physical_plan.execute()
Expand Down Expand Up @@ -402,6 +416,5 @@ mod test {
println!("{:?}", lf.describe_optimized_plan());
let new = lf.collect().unwrap();
println!("{:?}", new);
assert!(false)
}
}
14 changes: 14 additions & 0 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,20 @@ mod test {
);
}

#[test]
fn test_lazy_logical_plan_filter_and_alias_combined() {
let df = get_df();
let lf = df
.clone()
.lazy()
.filter(col("sepal.width").lt(lit(3.5)))
.select(&[col("variety").alias("foo")]);

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}

#[test]
fn test_lazy_logical_plan_schema() {
let df = get_df();
Expand Down
1 change: 1 addition & 0 deletions polars/src/lazy/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::lazy::prelude::*;
use crate::prelude::*;
pub(crate) mod predicate;
pub(crate) mod projection;
pub(crate) mod type_coercion;

// check if a selection/projection can be done on the downwards schema
fn check_down_node(expr: &Expr, down_schema: &Schema) -> bool {
Expand Down
179 changes: 179 additions & 0 deletions polars/src/lazy/logical_plan/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use crate::lazy::prelude::*;
use crate::lazy::utils::get_supertype;
use crate::prelude::*;

pub struct TypeCoercion {}

impl TypeCoercion {
/// Traverse the expressions from a level in the logical plan and maybe cast them.
fn rewrite_expressions(&self, exprs: Vec<Expr>, input_schema: &Schema) -> Result<Vec<Expr>> {
exprs
.into_iter()
.map(|expr| self.rewrite_expr(expr, input_schema))
.collect()
}

fn rewrite_expr(&self, expr: Expr, input_schema: &Schema) -> Result<Expr> {
// the important expression is BinaryExpr. The rest just traverses the tree.
use Expr::*;
match expr {
Alias(expr, name) => Ok(Expr::Alias(
Box::new(self.rewrite_expr(*expr, input_schema)?),
name,
)),
Column(_) => Ok(expr.clone()),
Literal(_) => Ok(expr.clone()),
BinaryExpr { left, op, right } => {
let left = self.rewrite_expr(*left, input_schema)?;
let right = self.rewrite_expr(*right, input_schema)?;

let type_left = left.get_type(input_schema)?;
let type_right = right.get_type(input_schema)?;
if type_left == type_right {
Ok(binary_expr(left, op, right))
} else {
let st = get_supertype(&type_left, &type_right)?;
Ok(binary_expr(left.cast(st.clone()), op, right.cast(st)))
}
}
Not(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.not())
}
IsNotNull(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.is_not_null())
}
IsNull(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.is_null())
}
Cast { expr, data_type } => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.cast(data_type))
}
Sort { expr, reverse } => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.sort(reverse))
}
AggMin(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_min())
}
AggMax(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_max())
}
AggMedian(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_median())
}
AggNUnique(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_n_unique())
}
AggFirst(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_first())
}
AggLast(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_last())
}
AggMean(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_mean())
}
AggQuantile { expr, quantile } => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_quantile(quantile))
}
AggSum(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_sum())
}
AggGroups(expr) => {
let expr = self.rewrite_expr(*expr, input_schema)?;
Ok(expr.agg_groups())
}
}
}

fn coerce(&self, logical_plan: LogicalPlan) -> Result<LogicalPlan> {
use LogicalPlan::*;
match logical_plan {
Selection { input, predicate } => {
let schema = input.schema();
let predicate = self.rewrite_expr(predicate, schema)?;
let input = Box::new(self.coerce(*input)?);
Ok(Selection { input, predicate })
}
CsvScan { .. } => Ok(logical_plan),
DataFrameScan { .. } => Ok(logical_plan),
Projection {
expr,
input,
schema,
} => {
let expr = self.rewrite_expressions(expr, &schema)?;
Ok(Projection {
expr,
input,
schema,
})
}
Sort {
input,
column,
reverse,
} => {
let input = Box::new(self.coerce(*input)?);
Ok(Sort {
input,
column,
reverse,
})
}
Aggregate {
input,
keys,
aggs,
schema,
} => {
let input = Box::new(self.coerce(*input)?);
let aggs = self.rewrite_expressions(aggs, &schema)?;
Ok(Aggregate {
input,
keys,
aggs,
schema,
})
}
Join {
input_left,
input_right,
schema,
how,
left_on,
right_on,
} => {
let input_left = Box::new(self.coerce(*input_left)?);
let input_right = Box::new(self.coerce(*input_right)?);
Ok(Join {
input_left,
input_right,
schema,
how,
left_on,
right_on,
})
}
}
}
}

impl Optimize for TypeCoercion {
fn optimize(&self, logical_plan: LogicalPlan) -> LogicalPlan {
self.coerce(logical_plan).unwrap()
}
}
5 changes: 4 additions & 1 deletion polars/src/lazy/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub use crate::lazy::{
dsl::*,
logical_plan::{
optimizer::{predicate::PredicatePushDown, projection::ProjectionPushDown, Optimize},
optimizer::{
predicate::PredicatePushDown, projection::ProjectionPushDown,
type_coercion::TypeCoercion, Optimize,
},
JoinType, LogicalPlan, LogicalPlanBuilder, Operator, ScalarValue,
},
physical_plan::{
Expand Down

0 comments on commit 1f19fbf

Please sign in to comment.