Skip to content

Commit

Permalink
init lazy join; #94
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent 4785e7f commit 17a0fa3
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 31 deletions.
59 changes: 59 additions & 0 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
prelude::*,
};
use arrow::datatypes::DataType;
use fnv::FnvHashSet;
use std::cell::RefCell;
use std::{fmt, rc::Rc};

Expand Down Expand Up @@ -112,6 +113,14 @@ pub enum LogicalPlan {
aggs: Vec<Expr>,
schema: Schema,
},
Join {
input_left: Box<LogicalPlan>,
input_right: Box<LogicalPlan>,
schema: Schema,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
},
}

impl fmt::Debug for LogicalPlan {
Expand All @@ -124,6 +133,7 @@ impl fmt::Debug for LogicalPlan {
Projection { expr, input, .. } => write!(f, "SELECT {:?} \nFROM\n{:?}", expr, input),
Sort { input, column, .. } => write!(f, "Sort\n\t{:?}\n{:?}", column, input),
Aggregate { keys, aggs, .. } => write!(f, "Aggregate\n\t{:?} BY {:?}", aggs, keys),
Join { .. } => write!(f, "JOIN"),
}
}
}
Expand All @@ -140,6 +150,7 @@ impl LogicalPlan {
Projection { schema, .. } => schema,
Sort { input, .. } => input.schema(),
Aggregate { schema, .. } => schema,
Join { schema, .. } => schema,
}
}
pub fn describe(&self) -> String {
Expand Down Expand Up @@ -220,6 +231,54 @@ impl LogicalPlanBuilder {
}
.into()
}

pub fn join(
self,
other: LogicalPlan,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
) -> Self {
let schema_left = self.0.schema();
let schema_right = other.schema();

let mut set = FnvHashSet::default();

for f in schema_left.fields() {
set.insert(f.clone());
}

for f in schema_right.fields() {
if set.contains(f) {
let field = Field::new(
&format!("{}_right", f.name()),
f.data_type().clone(),
f.is_nullable(),
);
set.insert(field);
} else {
set.insert(f.clone());
}
}
let schema = Schema::new(set.into_iter().collect());

LogicalPlan::Join {
input_left: Box::new(self.0),
input_right: Box::new(other),
how,
schema,
left_on,
right_on,
}
.into()
}
}

#[derive(Clone, Debug)]
pub enum JoinType {
Left,
Inner,
Outer,
}

#[cfg(test)]
Expand Down
71 changes: 41 additions & 30 deletions polars/src/lazy/logical_plan/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,6 @@ pub trait Optimize {
fn optimize(&self, logical_plan: LogicalPlan) -> LogicalPlan;
}

/// Take an expression and unwrap to Expr::Column() if it exists.
fn expr_to_root_column(expr: &Expr) -> Result<Expr> {
use Expr::*;
match expr {
Column(name) => Ok(Column(name.clone())),
Alias(expr, ..) => expr_to_root_column(expr),
Literal(_) => Err(PolarsError::Other("no root column exits for lit".into())),
// todo: return root columns? multiple?
BinaryExpr { .. } => Err(PolarsError::Other(
"no root column exits for binary expr".into(),
)),
Not(expr) => expr_to_root_column(expr),
IsNotNull(expr) => expr_to_root_column(expr),
IsNull(expr) => expr_to_root_column(expr),
Sort { expr, .. } => expr_to_root_column(expr),
AggMin(expr) => expr_to_root_column(expr),
}
}

// Result<[Column("foo"), Column("bar")]>
fn expressions_to_root_columns(exprs: &[Expr]) -> Result<Vec<Expr>> {
exprs.into_iter().map(|e| expr_to_root_column(e)).collect()
}

pub struct ProjectionPushDown {}

impl ProjectionPushDown {
Expand All @@ -43,25 +19,25 @@ impl ProjectionPushDown {
}

// check if a projection can be done upstream or should be done in this level of the tree.
fn check_upstream(&self, expr: &Expr, upstream_schema: &Schema) -> bool {
expr.to_field(upstream_schema).is_ok()
fn check_down_node(&self, expr: &Expr, down_schema: &Schema) -> bool {
expr.to_field(down_schema).is_ok()
}

// split in a projection vec that can be pushed upstream and a projection vec that should be used
// split in a projection vec that can be pushed down and a projection vec that should be used
// in this node
fn split_acc_projections(
&self,
acc_projections: Vec<Expr>,
upstream_schema: &Schema,
down_schema: &Schema,
) -> (Vec<Expr>, Vec<Expr>) {
// If node above has as many columns as the projection there is nothing to pushdown.
if upstream_schema.fields().len() == acc_projections.len() {
if down_schema.fields().len() == acc_projections.len() {
let local_projections = acc_projections;
(vec![], local_projections)
} else {
let (acc_projections, local_projections) = acc_projections
.into_iter()
.partition(|expr| self.check_upstream(expr, upstream_schema));
.partition(|expr| self.check_down_node(expr, down_schema));
(acc_projections, local_projections)
}
}
Expand Down Expand Up @@ -132,13 +108,47 @@ impl ProjectionPushDown {
Aggregate {
input, keys, aggs, ..
} => {
// TODO: projections of resulting columns of gb, should be renamed and pushed down
let (acc_projections, local_projections) =
self.split_acc_projections(acc_projections, input.schema());

let lp = self.push_down(*input, acc_projections);
let builder = LogicalPlanBuilder::from(lp).groupby(keys, aggs);
self.finish_node(local_projections, builder)
}
Join {
input_left,
input_right,
left_on,
right_on,
how,
..
} => {
let schema_left = input_left.schema();
let schema_right = input_right.schema();
let mut pushdown_left = vec![];
let mut pushdown_right = vec![];
let mut local_projection = vec![];
for proj in acc_projections {
let mut pushed_down = false;
if self.check_down_node(&proj, schema_left) {
pushdown_left.push(proj.clone());
pushed_down = true;
}
if self.check_down_node(&proj, schema_right) {
pushdown_right.push(proj.clone());
pushed_down = true;
}
if !pushed_down {
local_projection.push(proj)
}
}
let lp_left = self.push_down(*input_left, pushdown_left);
let lp_right = self.push_down(*input_right, pushdown_right);
let builder =
LogicalPlanBuilder::from(lp_left).join(lp_right, how, left_on, right_on);
self.finish_node(local_projection, builder)
}
}
}
}
Expand Down Expand Up @@ -208,6 +218,7 @@ impl PredicatePushDown {
} => LogicalPlanBuilder::from(self.push_down(*input, acc_predicates))
.groupby(keys, aggs)
.build(),
Join { .. } => todo!(),
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,44 @@ impl Executor for GroupByExec {
Ok(DataFrame::new_no_checks(columns))
}
}

#[derive(Debug)]
pub struct JoinExec {
input_left: Rc<dyn Executor>,
input_right: Rc<dyn Executor>,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
}

impl JoinExec {
pub(crate) fn new(
input_left: Rc<dyn Executor>,
input_right: Rc<dyn Executor>,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
) -> Self {
JoinExec {
input_left,
input_right,
how,
left_on,
right_on,
}
}
}

impl Executor for JoinExec {
fn execute(&self) -> Result<DataFrame> {
let df_left = self.input_left.execute()?;
let df_right = self.input_right.execute()?;

use JoinType::*;
match self.how {
Left => df_left.left_join(&df_right, &self.left_on, &self.right_on),
Inner => df_left.inner_join(&df_right, &self.left_on, &self.right_on),
Outer => df_left.outer_join(&df_right, &self.left_on, &self.right_on),
}
}
}
19 changes: 19 additions & 0 deletions polars/src/lazy/physical_plan/planner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::lazy::physical_plan::executors::JoinExec;
use crate::{lazy::prelude::*, prelude::*};
use std::rc::Rc;

Expand Down Expand Up @@ -65,6 +66,24 @@ impl DefaultPlanner {
.collect::<Result<Vec<_>>>()?;
Ok(Rc::new(GroupByExec::new(input, keys.clone(), phys_aggs)))
}
LogicalPlan::Join {
input_left,
input_right,
how,
left_on,
right_on,
..
} => {
let input_left = self.create_initial_physical_plan(input_left)?;
let input_right = self.create_initial_physical_plan(input_right)?;
Ok(Rc::new(JoinExec::new(
input_left,
input_right,
how.clone(),
left_on.clone(),
right_on.clone(),
)))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion polars/src/lazy/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub use crate::lazy::{
dsl::*,
logical_plan::{
optimizer::{Optimize, PredicatePushDown, ProjectionPushDown},
LogicalPlan, LogicalPlanBuilder, Operator, ScalarValue,
JoinType, LogicalPlan, LogicalPlanBuilder, Operator, ScalarValue,
},
physical_plan::{
executors::{CsvExec, DataFrameExec, FilterExec, GroupByExec, PipeExec, SortExec},
Expand Down

0 comments on commit 17a0fa3

Please sign in to comment.