Skip to content

Commit

Permalink
[Lazy] join on exprs instead of column names
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 10, 2020
1 parent c35a1f5 commit b421fa1
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 68 deletions.
37 changes: 31 additions & 6 deletions polars/src/frame/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,15 @@ impl DataFrame {
) -> Result<DataFrame> {
let s_left = self.column(left_on)?;
let s_right = other.column(right_on)?;
self.inner_join_from_series(other, s_left, s_right)
}

pub(crate) fn inner_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
) -> Result<DataFrame> {
let join_tuples = match self.parallel {
true => apply_hash_join_on_series!(s_left, s_right, par_hash_join_inner),
false => apply_hash_join_on_series!(s_left, s_right, hash_join_inner),
Expand All @@ -704,7 +713,7 @@ impl DataFrame {
let (df_left, df_right) = rayon::join(
|| self.create_left_df(&join_tuples),
|| unsafe {
other.drop(right_on).unwrap().take_iter_unchecked(
other.drop(s_right.name()).unwrap().take_iter_unchecked(
join_tuples.iter().map(|(_left, right)| *right),
Some(join_tuples.len()),
)
Expand All @@ -725,6 +734,15 @@ impl DataFrame {
pub fn left_join(&self, other: &DataFrame, left_on: &str, right_on: &str) -> Result<DataFrame> {
let s_left = self.column(left_on)?;
let s_right = other.column(right_on)?;
self.left_join_from_series(other, s_left, s_right)
}

pub(crate) fn left_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
) -> Result<DataFrame> {
let opt_join_tuples = match self.parallel {
true => apply_hash_join_on_series!(s_left, s_right, par_hash_join_left),
false => apply_hash_join_on_series!(s_left, s_right, hash_join_left),
Expand All @@ -733,7 +751,7 @@ impl DataFrame {
let (df_left, df_right) = rayon::join(
|| self.create_left_df(&opt_join_tuples),
|| unsafe {
other.drop(right_on).unwrap().take_opt_iter_unchecked(
other.drop(s_right.name()).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
Expand All @@ -759,29 +777,36 @@ impl DataFrame {
) -> Result<DataFrame> {
let s_left = self.column(left_on)?;
let s_right = other.column(right_on)?;

self.outer_join_from_series(other, s_left, s_right)
}
pub(crate) fn outer_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
) -> Result<DataFrame> {
// Get the indexes of the joined relations
let opt_join_tuples: Vec<(Option<usize>, Option<usize>)> =
apply_hash_join_on_series!(s_left, s_right, hash_join_outer);

// Take the left and right dataframes by join tuples
let (mut df_left, df_right) = rayon::join(
|| unsafe {
self.drop(left_on).unwrap().take_opt_iter_unchecked(
self.drop(s_left.name()).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(left, _right)| *left),
Some(opt_join_tuples.len()),
)
},
|| unsafe {
other.drop(right_on).unwrap().take_opt_iter_unchecked(
other.drop(s_right.name()).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
},
);
let mut s =
apply_method_all_series!(s_left, zip_outer_join_column, s_right, &opt_join_tuples);
s.rename(left_on);
s.rename(s_left.name());
df_left.hstack(&[s])?;
self.finish_join(df_left, df_right)
}
Expand Down
27 changes: 6 additions & 21 deletions polars/src/lazy/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,46 +263,31 @@ impl LazyFrame {
}

/// Join query with other lazy query.
pub fn left_join(self, other: LazyFrame, left_on: &str, right_on: &str) -> LazyFrame {
pub fn left_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.join(
other.logical_plan,
JoinType::Left,
Arc::new(left_on.into()),
Arc::new(right_on.into()),
)
.join(other.logical_plan, JoinType::Left, left_on, right_on)
.build();
Self::from_logical_plan(lp, opt_state)
}

/// Join query with other lazy query.
pub fn outer_join(self, other: LazyFrame, left_on: &str, right_on: &str) -> LazyFrame {
pub fn outer_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.join(
other.logical_plan,
JoinType::Outer,
Arc::new(left_on.into()),
Arc::new(right_on.into()),
)
.join(other.logical_plan, JoinType::Outer, left_on, right_on)
.build();
Self::from_logical_plan(lp, opt_state)
}

/// Join query with other lazy query.
pub fn inner_join(self, other: LazyFrame, left_on: &str, right_on: &str) -> LazyFrame {
pub fn inner_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.join(
other.logical_plan,
JoinType::Inner,
Arc::new(left_on.into()),
Arc::new(right_on.into()),
)
.join(other.logical_plan, JoinType::Inner, left_on, right_on)
.build();
Self::from_logical_plan(lp, opt_state)
}
Expand Down
30 changes: 13 additions & 17 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ pub enum LogicalPlan {
input_right: Box<LogicalPlan>,
schema: Schema,
how: JoinType,
left_on: Arc<String>,
right_on: Arc<String>,
left_on: Expr,
right_on: Expr,
},
HStack {
input: Box<LogicalPlan>,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl fmt::Debug for LogicalPlan {
..
} => write!(
f,
"JOIN\n\t({:?})\nWITH\n\t({:?})\nON (left: {} right: {})",
"JOIN\n\t({:?})\nWITH\n\t({:?})\nON (left: {:?} right: {:?})",
input_left, input_right, left_on, right_on
),
HStack { input, exprs, .. } => write!(f, "\n{:?} WITH COLUMN(S) {:?}\n", input, exprs),
Expand Down Expand Up @@ -265,13 +265,7 @@ impl LogicalPlanBuilder {
.into()
}

pub fn join(
self,
other: LogicalPlan,
how: JoinType,
left_on: Arc<String>,
right_on: Arc<String>,
) -> Self {
pub fn join(self, other: LogicalPlan, how: JoinType, left_on: Expr, right_on: Expr) -> Self {
let schema_left = self.0.schema();
let schema_right = other.schema();

Expand All @@ -285,10 +279,12 @@ impl LogicalPlanBuilder {
fields.push(f.clone());
}

let right_name = utils::output_name(&right_on).expect("could not find name");

for f in schema_right.fields() {
let name = f.name();

if name != &*right_on {
if name != &*right_name {
if names.contains(name) {
let new_name = format!("{}_right", name);
let field = Field::new(&new_name, f.data_type().clone(), f.is_nullable());
Expand Down Expand Up @@ -406,7 +402,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days");
.left_join(right.clone().lazy(), col("days"), col("days"));

print_plans(&lf);
// implicitly checks logical plan == optimized logical plan
Expand All @@ -419,7 +415,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp")]);

print_plans(&lf);
Expand All @@ -432,7 +428,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp"), col("rain_right")]);

print_plans(&lf);
Expand All @@ -446,7 +442,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp"), col("rain"), col("rain_right")]);

print_plans(&lf);
Expand All @@ -460,7 +456,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp"), col("rain").alias("foo"), col("rain_right")]);

print_plans(&lf);
Expand All @@ -474,7 +470,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), "days", "days")
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp"), col("rain").alias("foo"), col("rain_right")])
.filter(col("foo").lt(lit(0.3)));

Expand Down
4 changes: 2 additions & 2 deletions polars/src/lazy/logical_plan/optimizer/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl ProjectionPushDown {
let schema_right = input_right.schema();

// We need the join columns so we push the projection downwards
pushdown_left.push(Expr::Column(left_on.clone()));
pushdown_right.push(Expr::Column(right_on.clone()));
pushdown_left.push(left_on.clone());
pushdown_right.push(right_on.clone());

for mut proj in acc_projections {
let mut add_local = true;
Expand Down
19 changes: 12 additions & 7 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,17 @@ pub struct JoinExec {
input_left: Arc<dyn Executor>,
input_right: Arc<dyn Executor>,
how: JoinType,
left_on: Arc<String>,
right_on: Arc<String>,
left_on: Arc<dyn PhysicalExpr>,
right_on: Arc<dyn PhysicalExpr>,
}

impl JoinExec {
pub(crate) fn new(
input_left: Arc<dyn Executor>,
input_right: Arc<dyn Executor>,
how: JoinType,
left_on: Arc<String>,
right_on: Arc<String>,
left_on: Arc<dyn PhysicalExpr>,
right_on: Arc<dyn PhysicalExpr>,
) -> Self {
JoinExec {
input_left,
Expand All @@ -213,12 +213,17 @@ impl Executor for JoinExec {
fn execute(&self) -> Result<DataFrame> {
let (df_left, df_right) =
rayon::join(|| self.input_left.execute(), || self.input_right.execute());
let df_left = df_left?;
let df_right = df_right?;

let s_left = self.left_on.evaluate(&df_left)?;
let s_right = self.right_on.evaluate(&df_right)?;

use JoinType::*;
match self.how {
Left => df_left?.left_join(&df_right?, &self.left_on, &self.right_on),
Inner => df_left?.inner_join(&df_right?, &self.left_on, &self.right_on),
Outer => df_left?.outer_join(&df_right?, &self.left_on, &self.right_on),
Left => df_left.left_join_from_series(&df_right, &s_left, &s_right),
Inner => df_left.inner_join_from_series(&df_right, &s_left, &s_right),
Outer => df_left.outer_join_from_series(&df_right, &s_left, &s_right),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions polars/src/lazy/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ impl DefaultPlanner {
} => {
let input_left = self.create_initial_physical_plan(input_left)?;
let input_right = self.create_initial_physical_plan(input_right)?;
let left_on = self.create_physical_expr(left_on)?;
let right_on = self.create_physical_expr(right_on)?;
Ok(Arc::new(JoinExec::new(
input_left,
input_right,
how.clone(),
left_on.clone(),
right_on.clone(),
left_on,
right_on,
)))
}
LogicalPlan::HStack { input, exprs, .. } => {
Expand Down
19 changes: 18 additions & 1 deletion polars/src/lazy/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,26 @@ pub(crate) fn projected_name(expr: &Expr) -> Result<Expr> {
Expr::Column(name) => Ok(Expr::Column(name.clone())),
Expr::Alias(_, name) => Ok(Expr::Column(name.clone())),
Expr::Sort { expr, .. } => projected_name(expr),
Expr::Cast { expr, .. } => projected_name(expr),
a => Err(PolarsError::Other(
format!(
"No root column name could be found for {:?} in projected_name utillity",
"No root column name could be found for expr {:?} in projected_name utillity",
a
)
.into(),
)),
}
}

pub(crate) fn output_name(expr: &Expr) -> Result<Arc<String>> {
match expr {
Expr::Column(name) => Ok(name.clone()),
Expr::Alias(_, name) => Ok(name.clone()),
Expr::Sort { expr, .. } => output_name(expr),
Expr::Cast { expr, .. } => output_name(expr),
a => Err(PolarsError::Other(
format!(
"No root column name could be found for expr {:?} in output name utillity",
a
)
.into(),
Expand Down
2 changes: 1 addition & 1 deletion polars/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ impl Series {
/// assert_eq!(Vec::from(shifted.i32()?), &[None, Some(1), Some(2)]);
///
/// let shifted = s.shift(-1)?;
/// assert_eq!(Vec::from(shifted.i32()?), &[Some(1), Some(2), None]);
/// assert_eq!(Vec::from(shifted.i32()?), &[Some(2), Some(3), None]);
///
/// let shifted = s.shift(2)?;
/// assert_eq!(Vec::from(shifted.i32()?), &[None, None, Some(1)]);
Expand Down
4 changes: 2 additions & 2 deletions py-polars/pypolars/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def groupby(self, by: Union[str, List[str]]) -> LazyGroupBy:
def join(
self,
ldf: LazyFrame,
left_on: str,
right_on: str,
left_on: PyExpr,
right_on: PyExpr,
how="inner",
) -> LazyFrame:
if how == "inner":
Expand Down

0 comments on commit b421fa1

Please sign in to comment.