Skip to content

Commit

Permalink
2: more correct join optmizations
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent cf8b615 commit 9a6e254
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 48 deletions.
108 changes: 85 additions & 23 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl fmt::Debug for LogicalPlan {
..
} => write!(
f,
"JOIN ({:?}) WITH ({:?}) ON (left: {} right: {})",
"JOIN\n\t({:?})\nWITH\n\t({:?})\nON (left: {} right: {})",
input_left, input_right, left_on, right_on
),
}
Expand Down Expand Up @@ -189,6 +189,7 @@ impl LogicalPlanBuilder {
}

pub fn project(self, expr: Vec<Expr>) -> Self {
// TODO: don't panic
let schema = utils::expressions_to_schema(&expr, self.0.schema());
LogicalPlan::Projection {
expr,
Expand Down Expand Up @@ -261,25 +262,30 @@ impl LogicalPlanBuilder {
let schema_left = self.0.schema();
let schema_right = other.schema();

let mut set = FnvHashSet::default();
// column names of left table
let mut names = FnvHashSet::default();
// fields of new schema
let mut fields = vec![];

for f in schema_left.fields() {
set.insert(f.clone());
names.insert(f.name());
fields.push(f.clone());
}

for f in schema_right.fields() {
if set.contains(f) {
let field = Field::new(
&format!("{}_right", f.name()),
f.data_type().clone(),
f.is_nullable(),
);
set.insert(field);
} else {
set.insert(f.clone());
let name = f.name();

if name != &*right_on {
if names.contains(name) {
let new_name = format!("{}_right", name);
let field = Field::new(&new_name, f.data_type().clone(), f.is_nullable());
fields.push(field)
} else {
fields.push(f.clone())
}
}
}
let schema = Schema::new(set.into_iter().collect());
let schema = Schema::new(fields);

LogicalPlan::Join {
input_left: Box::new(self.0),
Expand All @@ -306,7 +312,7 @@ mod test {
use crate::lazy::tests::get_df;
use crate::prelude::*;

fn compare_plans(lf: &LazyFrame) {
fn print_plans(lf: &LazyFrame) {
println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan());
println!(
"OPTIMIZED LOGICAL PLAN\n\n{}\n",
Expand Down Expand Up @@ -338,7 +344,8 @@ mod test {
#[test]
fn test_lazy_logical_plan_join() {
let left = df!("days" => &[0, 1, 2, 3, 4],
"temp" => [22.1, 19.9, 7., 2., 3.]
"temp" => [22.1, 19.9, 7., 2., 3.],
"rain" => &[0.1, 0.2, 0.3, 0.4, 0.5]
)
.unwrap();

Expand All @@ -348,16 +355,71 @@ mod test {
)
.unwrap();

let lf = left
.lazy()
.left_join(right.lazy(), "days", "days")
.select(&[col("temp")]);
// check if optimizations succeeds without selection
{
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days");

print_plans(&lf);
// implicitly checks logical plan == optimized logical plan
let df = lf.collect().unwrap();
println!("{:?}", df);
}

compare_plans(&lf);
// check if optimization succeeds with selection
{
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.select(&[col("temp")]);

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}

let df = lf.collect().unwrap();
println!("{:?}", df);
// check if optimization succeeds with selection of a renamed column due to the join
{
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.select(&[col("temp"), col("rain_right")]);

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}

assert!(false)
// check if optimization succeeds with selection of the left and the right (renamed)
// column due to the join
{
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.select(&[col("temp"), col("rain"), col("rain_right")]);

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}

// check if optimization succeeds with selection of the left and the right (renamed)
// column due to the join and an extra alias
{
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.select(&[col("temp"), col("rain").alias("foo"), col("rain_right")]);

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}
}
}
110 changes: 87 additions & 23 deletions polars/src/lazy/logical_plan/optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::lazy::prelude::*;
use crate::lazy::utils::expr_to_root_column;
use crate::prelude::*;
use std::rc::Rc;

pub trait Optimize {
fn optimize(&self, logical_plan: LogicalPlan) -> LogicalPlan;
Expand Down Expand Up @@ -54,6 +56,27 @@ impl ProjectionPushDown {
}
}

fn join_push_down(
&self,
schema_left: &Schema,
schema_right: &Schema,
proj: &Expr,
pushdown_left: &mut Vec<Expr>,
pushdown_right: &mut Vec<Expr>,
) -> bool {
let mut pushed_at_least_one = false;

if self.check_down_node(&proj, schema_left) {
pushdown_left.push(proj.clone());
pushed_at_least_one = true;
}
if self.check_down_node(&proj, schema_right) {
pushdown_right.push(proj.clone());
pushed_at_least_one = true;
}
pushed_at_least_one
}

// We recurrently traverse the logical plan and every projection we encounter we add to the accumulated
// projections.
// Every non projection operation we recurse and rebuild that operation on the output of the recursion.
Expand Down Expand Up @@ -124,34 +147,75 @@ impl ProjectionPushDown {
how,
..
} => {
let schema_left = input_left.schema();
let schema_right = input_right.schema();
let mut pushdown_left = vec![];
let mut pushdown_right = vec![];
let mut local_projection = vec![];
pushdown_left.push(Expr::Column(left_on.clone()));
pushdown_right.push(Expr::Column(right_on.clone()));

for proj in acc_projections {
let mut pushed_down = false;
if self.check_down_node(&proj, schema_left) {
pushdown_left.push(proj.clone());
pushed_down = true;
}
if self.check_down_node(&proj, schema_right) {
pushdown_right.push(proj.clone());
pushed_down = true;
// if there are no projections we don't have to do anything
if acc_projections.len() > 0 {
let schema_left = input_left.schema();
let schema_right = input_right.schema();

// We need the join columns so we push the projection downwards
pushdown_left.push(Expr::Column(left_on.clone()));
pushdown_right.push(Expr::Column(right_on.clone()));

for mut proj in acc_projections {
let mut add_local = true;

// if it is an alias we want to project the root column name downwards
// but we don't want to project it a this level, otherwise we project both
// the root and the alias, hence add_local = false.
if let Expr::Alias(expr, name) = proj {
let root_name = expr_to_root_column(&expr).unwrap();

proj = Expr::Column(root_name);
local_projection.push(Expr::Alias(Box::new(proj.clone()), name));

// now we don
add_local = false;
}

// Path for renamed columns due to the join. The column name of the left table
// stays as is, the column of the right will have the "_right" suffix.
// Thus joining two tables with both a foo column leads to ["foo", "foo_right"]
if !self.join_push_down(
schema_left,
schema_right,
&proj,
&mut pushdown_left,
&mut pushdown_right,
) {
// Column name of the projection without any alias.
let root_column_name = expr_to_root_column(&proj).unwrap();

// If _right suffix exists we need to push a projection down without this
// suffix.
if root_column_name.ends_with("_right") {
// downwards name is the name without the _right i.e. "foo".
let (downwards_name, _) = root_column_name
.split_at(root_column_name.len() - "_right".len());

// project downwards and immediately alias to prevent wrong projections
let projection =
col(downwards_name).alias(&format!("{}_right", downwards_name));
pushdown_right.push(projection);
// locally we project the aliased column
local_projection.push(proj);
}
} else if add_local {
// always also do the projection locally, because the join columns may not be
// included in the projection.
// for instance:
//
// SELECT [COLUMN temp]
// FROM
// JOIN (["days", "temp"]) WITH (["days", "rain"]) ON (left: days right: days)
//
// should drop the days column afther the join.
local_projection.push(proj)
}
}
// always also do the projection locally, because the join columns may not be
// included in the projection.
// for instance:
//
// SELECT [COLUMN temp]
// FROM
// JOIN (["days", "temp"]) WITH (["days", "rain"]) ON (left: days right: days)
//
// should drop the days column afther the join.
local_projection.push(proj)
}
let lp_left = self.push_down(*input_left, pushdown_left);
let lp_right = self.push_down(*input_right, pushdown_right);
Expand Down
16 changes: 14 additions & 2 deletions polars/src/lazy/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
use crate::{lazy::prelude::*, prelude::*};
use std::rc::Rc;

// unpack alias(col) to name of the root column
pub(crate) fn expr_to_root_column(expr: &Expr) -> Result<Rc<String>> {
match expr {
Expr::Column(name) => Ok(name.clone()),
Expr::Alias(expr, _) => expr_to_root_column(expr),
a => Err(PolarsError::Other(
format!("No root column name could be found for {:?}", a).into(),
)),
}
}

pub fn expressions_to_schema(expr: &[Expr], schema: &Schema) -> Schema {
pub(crate) fn expressions_to_schema(expr: &[Expr], schema: &Schema) -> Schema {
let fields = expr
.iter()
.map(|expr| expr.to_field(schema))
Expand All @@ -10,7 +22,7 @@ pub fn expressions_to_schema(expr: &[Expr], schema: &Schema) -> Schema {
}

/// Given two datatypes, determine the supertype that both types can safely be cast to
pub fn get_supertype(l: &ArrowDataType, r: &ArrowDataType) -> Result<ArrowDataType> {
pub(crate) fn get_supertype(l: &ArrowDataType, r: &ArrowDataType) -> Result<ArrowDataType> {
match _get_supertype(l, r) {
Some(dt) => Ok(dt),
None => _get_supertype(r, l).ok_or_else(|| {
Expand Down

0 comments on commit 9a6e254

Please sign in to comment.