Skip to content

Commit

Permalink
lazy: fix bug in join suffix and improve call ergonomics
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 11, 2022
1 parent c05168d commit c82c8d5
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 38 deletions.
6 changes: 0 additions & 6 deletions polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,6 @@ impl AsRef<Expr> for AggExpr {
}
}

impl From<AggExpr> for Expr {
fn from(agg: AggExpr) -> Self {
Expr::Agg(agg)
}
}

/// Queries consists of multiple expressions.
#[derive(Clone, PartialEq)]
#[must_use]
Expand Down
53 changes: 30 additions & 23 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use csv::*;
pub use ipc::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use std::borrow::Cow;

#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
use polars_core::datatypes::PlHashMap;
Expand Down Expand Up @@ -46,7 +47,7 @@ pub struct JoinOptions {
pub allow_parallel: bool,
pub force_parallel: bool,
pub how: JoinType,
pub suffix: Option<String>,
pub suffix: Cow<'static, str>,
pub asof_by_left: Vec<String>,
pub asof_by_right: Vec<String>,
}
Expand All @@ -57,7 +58,7 @@ impl Default for JoinOptions {
allow_parallel: true,
force_parallel: false,
how: JoinType::Left,
suffix: None,
suffix: "_right".into(),
asof_by_left: vec![],
asof_by_right: vec![],
}
Expand Down Expand Up @@ -250,7 +251,8 @@ impl LazyFrame {
/// .sort_by_exprs(vec![col("sepal.width")], vec![false])
/// }
/// ```
pub fn sort_by_exprs(self, by_exprs: Vec<Expr>, reverse: Vec<bool>) -> Self {
pub fn sort_by_exprs<E: AsRef<[Expr]>>(self, by_exprs: E, reverse: Vec<bool>) -> Self {
let by_exprs = by_exprs.as_ref().to_vec();
if by_exprs.is_empty() {
self
} else {
Expand Down Expand Up @@ -351,21 +353,21 @@ impl LazyFrame {
/// with the result of the `fill_value` expression.
///
/// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
pub fn shift_and_fill(self, periods: i64, fill_value: Expr) -> Self {
self.select_local(vec![col("*").shift_and_fill(periods, fill_value)])
pub fn shift_and_fill<E: Into<Expr>>(self, periods: i64, fill_value: E) -> Self {
self.select_local(vec![col("*").shift_and_fill(periods, fill_value.into())])
}

/// Fill none values in the DataFrame
pub fn fill_null(self, fill_value: Expr) -> LazyFrame {
pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().fill_null(fill_value).build();
let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
Self::from_logical_plan(lp, opt_state)
}

/// Fill NaN values in the DataFrame
pub fn fill_nan(self, fill_value: Expr) -> LazyFrame {
pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().fill_nan(fill_value).build();
let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
Self::from_logical_plan(lp, opt_state)
}

Expand Down Expand Up @@ -670,8 +672,8 @@ impl LazyFrame {
/// .left_join(other, col("foo"), col("bar"))
/// }
/// ```
pub fn left_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Left)
pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
self.join(other, [left_on.into()], [right_on.into()], JoinType::Left)
}

/// Join query with other lazy query.
Expand All @@ -686,8 +688,8 @@ impl LazyFrame {
/// .outer_join(other, col("foo"), col("bar"))
/// }
/// ```
pub fn outer_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Outer)
pub fn outer_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
self.join(other, [left_on.into()], [right_on.into()], JoinType::Outer)
}

/// Join query with other lazy query.
Expand All @@ -702,8 +704,8 @@ impl LazyFrame {
/// .inner_join(other, col("foo"), col("bar").cast(DataType::Utf8))
/// }
/// ```
pub fn inner_join(self, other: LazyFrame, left_on: Expr, right_on: Expr) -> LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Inner)
pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
self.join(other, [left_on.into()], [right_on.into()], JoinType::Inner)
}

/// Creates the cartesian product from both frames, preserves the order of the left keys.
Expand Down Expand Up @@ -1050,7 +1052,7 @@ pub struct JoinBuilder {
asof_by_right: Vec<String>,
}
impl JoinBuilder {
fn new(lf: LazyFrame) -> Self {
pub fn new(lf: LazyFrame) -> Self {
Self {
lf,
other: None,
Expand Down Expand Up @@ -1078,14 +1080,14 @@ impl JoinBuilder {
}

/// The columns you want to join the left table on.
pub fn left_on(mut self, on: Vec<Expr>) -> Self {
self.left_on = on;
pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
self.left_on = on.as_ref().to_vec();
self
}

/// The columns you want to join the right table on.
pub fn right_on(mut self, on: Vec<Expr>) -> Self {
self.right_on = on;
pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
self.right_on = on.as_ref().to_vec();
self
}
/// Allow parallel table evaluation.
Expand All @@ -1102,8 +1104,8 @@ impl JoinBuilder {

/// Suffix to add duplicate column names in join.
/// Defaults to `"_right"`.
pub fn suffix(mut self, suffix: String) -> Self {
self.suffix = Some(suffix);
pub fn suffix<S: AsRef<str>>(mut self, suffix: S) -> Self {
self.suffix = Some(suffix.as_ref().to_string());
self
}

Expand All @@ -1118,6 +1120,11 @@ impl JoinBuilder {
pub fn finish(self) -> LazyFrame {
let opt_state = self.lf.opt_state;

let suffix = match self.suffix {
None => Cow::Borrowed("_right"),
Some(suffix) => Cow::Owned(suffix),
};

let lp = self
.lf
.get_plan_builder()
Expand All @@ -1129,7 +1136,7 @@ impl JoinBuilder {
allow_parallel: self.allow_parallel,
force_parallel: self.force_parallel,
how: self.how,
suffix: self.suffix,
suffix,
asof_by_left: self.asof_by_left,
asof_by_right: self.asof_by_right,
},
Expand Down
13 changes: 13 additions & 0 deletions polars/polars-lazy/src/from.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::prelude::*;

impl From<AggExpr> for Expr {
fn from(agg: AggExpr) -> Self {
Expr::Agg(agg)
}
}

impl<S: AsRef<str>> From<S> for Expr {
fn from(s: S) -> Self {
col(s.as_ref())
}
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ mod dummies;
#[cfg(feature = "compile")]
pub mod frame;
#[cfg(feature = "compile")]
mod from;
#[cfg(feature = "compile")]
pub mod functions;
#[cfg(feature = "compile")]
pub mod logical_plan;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
let name = f.name();
if !right_names.contains(name.as_str()) {
if names.contains(name.as_str()) {
let new_name = format!("{}_right", name);
let new_name = format!("{}{}", name, options.suffix.as_ref());
let field = Field::new(&new_name, f.data_type().clone());
fields.push(field)
} else {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl LogicalPlanBuilder {

if !right_names.iter().any(|s| s.as_ref() == name) {
if names.contains(name) {
let new_name = format!("{}_right", name);
let new_name = format!("{}{}", name, options.suffix.as_ref());
let field = Field::new(&new_name, f.data_type().clone());
fields.push(field)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ impl ProjectionPushDown {
let root_column_name =
aexpr_to_root_names(proj, expr_arena).pop().unwrap();

let suffix = options.suffix.as_deref().unwrap_or("_right");
let suffix = options.suffix.as_ref();
// If _right suffix exists we need to push a projection down without this
// suffix.
if root_column_name.ends_with(suffix) {
Expand Down
9 changes: 5 additions & 4 deletions polars/polars-lazy/src/physical_plan/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_core::POOL;
use std::borrow::Cow;

pub struct JoinExec {
input_left: Option<Box<dyn Executor>>,
Expand All @@ -11,7 +12,7 @@ pub struct JoinExec {
left_on: Vec<Arc<dyn PhysicalExpr>>,
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
suffix: Option<String>,
suffix: Cow<'static, str>,
// not used if asof not activated
#[allow(dead_code)]
asof_by_left: Vec<String>,
Expand All @@ -29,7 +30,7 @@ impl JoinExec {
left_on: Vec<Arc<dyn PhysicalExpr>>,
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
suffix: Option<String>,
suffix: Cow<'static, str>,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
) -> Self {
Expand Down Expand Up @@ -111,7 +112,7 @@ impl Executor for JoinExec {
&left_names,
&right_names,
self.how,
self.suffix.clone(),
Some(self.suffix.clone().into_owned()),
)
};

Expand All @@ -121,7 +122,7 @@ impl Executor for JoinExec {
&left_names,
&right_names,
self.how,
self.suffix.clone(),
Some(self.suffix.clone().into_owned()),
);

if state.verbose {
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "parquet")]
mod io;
mod predicate_pushdown;
mod projection_pushdown;
mod queries;

use polars_core::prelude::*;
Expand Down
35 changes: 35 additions & 0 deletions polars/polars-lazy/src/tests/projection_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use super::*;

#[test]
fn test_join_suffix_and_drop() -> Result<()> {
let weight = df![
"id" => [1, 2, 3, 4, 5, 0],
"wgt" => [4.32, 5.23, 2.33, 23.399, 392.2, 0.0]
]?
.lazy();

let ped = df![
"id"=> [1, 2, 3, 4, 5],
"sireid"=> [0, 0, 1, 3, 3]
]?
.lazy();

let sumry = weight
.clone()
.filter(col("id").eq(lit(2i32)))
.inner_join(ped, "id", "id");

let out = sumry
.join_builder()
.with(weight)
.left_on([col("sireid")])
.right_on([col("id")])
.suffix("_sire")
.finish()
.drop_columns(["sireid"])
.collect()?;

assert_eq!(out.shape(), (1, 3));

Ok(())
}
10 changes: 8 additions & 2 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,14 @@ impl PyLazyFrame {

let ldf = self.ldf.clone();
let other = other.ldf;
let left_on = left_on.into_iter().map(|pyexpr| pyexpr.inner).collect();
let right_on = right_on.into_iter().map(|pyexpr| pyexpr.inner).collect();
let left_on = left_on
.into_iter()
.map(|pyexpr| pyexpr.inner)
.collect::<Vec<_>>();
let right_on = right_on
.into_iter()
.map(|pyexpr| pyexpr.inner)
.collect::<Vec<_>>();

ldf.join_builder()
.with(other)
Expand Down

0 comments on commit c82c8d5

Please sign in to comment.