Skip to content

Commit

Permalink
polars lazy cookbook
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 27, 2021
1 parent b44a057 commit 8355e22
Show file tree
Hide file tree
Showing 16 changed files with 446 additions and 127 deletions.
3 changes: 3 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ sort_multiple = ["polars-core/sort_multiple"]
# is_in operation
is_in = ["polars-core/is_in", "polars-lazy/is_in"]

# dont use this
private = ["polars-core/private", "polars-lazy/private"]

# all opt-in datatypes
dtype-full = [
"dtype-time64-ns",
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ dtype-date64 = ["polars-core/dtype-date64"]
# is_in operation
is_in = ["polars-core/is_in"]

# no guarantees whatsoever
private = []

[dependencies]
ahash = "0.7"
rayon = "1.5"
Expand Down
162 changes: 91 additions & 71 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::logical_plan::optimizer::{
};
use crate::physical_plan::state::ExecutionState;
use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::join_pruning::JoinPrune;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::utils::combine_predicates_expr;
use crate::{logical_plan::FETCH_ROWS, prelude::*};
Expand Down Expand Up @@ -457,9 +456,8 @@ impl LazyFrame {
let type_coercion = self.opt_state.type_coercion;
let simplify_expr = self.opt_state.simplify_expr;

let mut agg_scan_projection = self.opt_state.agg_scan_projection;
let agg_scan_projection = self.opt_state.agg_scan_projection;
let aggregate_pushdown = self.opt_state.aggregate_pushdown;
let join_pruning = self.opt_state.join_pruning;

let logical_plan = self.get_plan_builder().build();

Expand Down Expand Up @@ -502,10 +500,6 @@ impl LazyFrame {
if aggregate_pushdown {
rules.push(Box::new(AggregatePushdown::new()))
}
if join_pruning {
rules.push(Box::new(JoinPrune {}));
agg_scan_projection = true;
}

if agg_scan_projection {
// scan the LP to aggregate all the column used in scans
Expand Down Expand Up @@ -682,23 +676,11 @@ impl LazyFrame {
/// use polars_lazy::prelude::*;
/// fn join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
/// ldf
/// .left_join(other, col("foo"), col("bar"), None)
/// .left_join(other, col("foo"), col("bar"))
/// }
/// ```
pub fn left_join(
self,
other: LazyFrame,
left_on: Expr,
right_on: Expr,
options: Option<JoinOptions>,
) -> LazyFrame {
self.join(
other,
vec![left_on],
vec![right_on],
options,
JoinType::Left,
)
pub fn left_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Left)
}

/// Join query with other lazy query.
Expand All @@ -710,23 +692,11 @@ impl LazyFrame {
/// use polars_lazy::prelude::*;
/// fn join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
/// ldf
/// .outer_join(other, col("foo"), col("bar"), None)
/// .outer_join(other, col("foo"), col("bar"))
/// }
/// ```
pub fn outer_join(
self,
other: LazyFrame,
left_on: Expr,
right_on: Expr,
options: Option<JoinOptions>,
) -> LazyFrame {
self.join(
other,
vec![left_on],
vec![right_on],
options,
JoinType::Outer,
)
pub fn outer_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Outer)
}

/// Join query with other lazy query.
Expand All @@ -738,23 +708,11 @@ impl LazyFrame {
/// use polars_lazy::prelude::*;
/// fn join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
/// ldf
/// .inner_join(other, col("foo"), col("bar").cast(DataType::Utf8), None)
/// .inner_join(other, col("foo"), col("bar").cast(DataType::Utf8))
/// }
/// ```
pub fn inner_join(
self,
other: LazyFrame,
left_on: Expr,
right_on: Expr,
options: Option<JoinOptions>,
) -> LazyFrame {
self.join(
other,
vec![left_on],
vec![right_on],
options,
JoinType::Inner,
)
pub fn inner_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Inner)
}

/// Generic join function that can join on multiple columns.
Expand All @@ -767,31 +725,26 @@ impl LazyFrame {
///
/// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
/// ldf
/// .join(other, vec![col("foo"), col("bar")], vec![col("foo"), col("bar")], None, JoinType::Inner)
/// .join(other, vec![col("foo"), col("bar")], vec![col("foo"), col("bar")], JoinType::Inner)
/// }
/// ```
pub fn join(
self,
other: LazyFrame,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
options: Option<JoinOptions>,
how: JoinType,
) -> LazyFrame {
let opt_state = self.get_opt_state();
let opts = options.unwrap_or_default();
let lp = self
.get_plan_builder()
.join(
other.logical_plan,
how,
left_on,
right_on,
opts.allow_parallel,
opts.force_parallel,
)
.build();
Self::from_logical_plan(lp, opt_state)
self.join_builder()
.with(other)
.left_on(left_on)
.right_on(right_on)
.how(how)
.finish()
}

pub fn join_builder(self) -> JoinBuilder {
JoinBuilder::new(self)
}

/// Add a column to a DataFrame
Expand Down Expand Up @@ -1037,6 +990,74 @@ impl LazyGroupBy {
}
}

pub struct JoinBuilder {
lf: LazyFrame,
how: JoinType,
other: Option<LazyFrame>,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
allow_parallel: bool,
force_parallel: bool,
}
impl JoinBuilder {
fn new(lf: LazyFrame) -> Self {
Self {
lf,
other: None,
how: JoinType::Inner,
left_on: vec![],
right_on: vec![],
allow_parallel: true,
force_parallel: false,
}
}

pub fn with(mut self, other: LazyFrame) -> Self {
self.other = Some(other);
self
}

pub fn how(mut self, how: JoinType) -> Self {
self.how = how;
self
}

pub fn left_on(mut self, on: Vec<Expr>) -> Self {
self.left_on = on;
self
}

pub fn right_on(mut self, on: Vec<Expr>) -> Self {
self.right_on = on;
self
}
pub fn allow_parallel(mut self, allow: bool) -> Self {
self.allow_parallel = allow;
self
}
pub fn force_parallel(mut self, allow: bool) -> Self {
self.allow_parallel = allow;
self
}
pub fn finish(self) -> LazyFrame {
let opt_state = self.lf.opt_state;

let lp = self
.lf
.get_plan_builder()
.join(
self.other.expect("with not set").logical_plan,
self.how,
self.left_on,
self.right_on,
self.allow_parallel,
self.force_parallel,
)
.build();
LazyFrame::from_logical_plan(lp, opt_state)
}
}

#[cfg(test)]
mod test {
#[cfg(feature = "temporal")]
Expand Down Expand Up @@ -1319,7 +1340,7 @@ mod test {
let df_a = load_df();
let df_b = df_a.clone();
df_a.lazy()
.left_join(df_b.lazy(), col("b"), col("b"), None)
.left_join(df_b.lazy(), col("b"), col("b"))
.filter(col("a").lt(lit(2)))
.groupby(vec![col("b")])
.agg(vec![col("b").first(), col("c").first()])
Expand Down Expand Up @@ -1388,7 +1409,6 @@ mod test {
base_df,
vec![col("uid"), col("day")],
vec![col("uid"), col("day")],
None,
JoinType::Inner,
)
.collect()
Expand Down Expand Up @@ -2059,7 +2079,7 @@ mod test {
};

let out = a()
.left_join(a(), col("foo"), col("foo"), None)
.left_join(a(), col("foo"), col("foo"))
.select(vec![col("bar")])
.collect()?;

Expand Down
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Lazy API of Polars
//!
//! *Credits to the work of Andy Grove and Ballista/ DataFusion / Apache Arrow, which gave
//! this a huge kickstart.*
//! *Credits to the work of Andy Grove and Ballista/ DataFusion / Apache Arrow, which served as
//! insipration for the lazy API.*
//!
//! The lazy api of Polars supports a subset of the eager api. Apart from the distributed compute,
//! it is very similar to [Apache Spark](https://spark.apache.org/). You write queries in a
Expand Down Expand Up @@ -158,7 +158,7 @@
//!
//! fn example(df_a: DataFrame, df_b: DataFrame) -> LazyFrame {
//! df_a.lazy()
//! .left_join(df_b.lazy(), col("b_left"), col("b_right"), None)
//! .left_join(df_b.lazy(), col("b_left"), col("b_right"))
//! .filter(
//! col("a").lt(lit(2))
//! )
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-lazy/src/logical_plan/aexpr.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::logical_plan::iterator::ArenaExprIter;
use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::rename_field;
Expand Down Expand Up @@ -319,10 +318,12 @@ impl AExpr {
}

/// Check if AExpr equality. The nodes may differ.
/// j
///
/// For instance: there can be two columns "foo" in the memory arena. These are equal,
/// but would have different node values.
#[cfg(feature = "private")]
pub(crate) fn eq(node_left: Node, node_right: Node, expr_arena: &Arena<AExpr>) -> bool {
use crate::logical_plan::iterator::ArenaExprIter;
let cmp = |(node_left, node_right)| {
use AExpr::*;
match (expr_arena.get(node_left), expr_arena.get(node_right)) {
Expand Down
6 changes: 4 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::logical_plan::iterator::ArenaLpIter;
use crate::logical_plan::{det_melt_schema, Context};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
Expand All @@ -7,7 +6,6 @@ use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
use polars_core::utils::{Arena, Node};
use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
use std::sync::Arc;

Expand Down Expand Up @@ -161,12 +159,16 @@ impl ALogicalPlan {
///
/// For instance: there can be two columns "foo" in the memory arena. These are equal,
/// but would have different node values.
#[cfg(feature = "private")]
pub(crate) fn eq(
node_left: Node,
node_right: Node,
lp_arena: &Arena<ALogicalPlan>,
expr_arena: &Arena<AExpr>,
) -> bool {
use crate::logical_plan::iterator::ArenaLpIter;
use std::fs::canonicalize;

let cmp = |(node_left, node_right)| {
use ALogicalPlan::*;
match (lp_arena.get(node_left), lp_arena.get(node_right)) {
Expand Down
12 changes: 6 additions & 6 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,10 +1362,10 @@ mod test {

// check if optimizations succeeds without selection
{
let lf =
left.clone()
.lazy()
.left_join(right.clone().lazy(), col("days"), col("days"), None);
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), col("days"), col("days"));

print_plans(&lf);
// implicitly checks logical plan == optimized logical plan
Expand All @@ -1378,7 +1378,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), col("days"), col("days"), None)
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp")]);

print_plans(&lf);
Expand All @@ -1391,7 +1391,7 @@ mod test {
let lf = left
.clone()
.lazy()
.left_join(right.clone().lazy(), col("days"), col("days"), None)
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp"), col("rain_right")]);

print_plans(&lf);
Expand Down
6 changes: 2 additions & 4 deletions polars/polars-lazy/src/logical_plan/optimizer/join_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ mod test {
.groupby(vec![col("b")])
.agg(vec![col("a").last()]);

let (root, mut expr_arena, mut alp_arena) =
q1.left_join(q2, col("b"), col("b"), None).into_alp();
let (root, mut expr_arena, mut alp_arena) = q1.left_join(q2, col("b"), col("b")).into_alp();
dbg!(alp_arena.get(root));
let mut opt = JoinPrune {};
let out = opt
Expand All @@ -213,8 +212,7 @@ mod test {
.groupby(vec![col("b")])
.agg(vec![col("a").last()]);

let (root, mut expr_arena, mut alp_arena) =
q1.left_join(q2, col("b"), col("b"), None).into_alp();
let (root, mut expr_arena, mut alp_arena) = q1.left_join(q2, col("b"), col("b")).into_alp();
dbg!(alp_arena.get(root));
let mut opt = JoinPrune {};
let out = opt
Expand Down

0 comments on commit 8355e22

Please sign in to comment.