Skip to content

Commit

Permalink
fix bug in predicate pushdown and init string namespace in lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 27, 2022
1 parent db4a90c commit 6c6274b
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 129 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"]
# parallel = ["polars-core/parallel"]

# extra utilities for Utf8Chunked
strings = ["polars-core/strings"]
strings = ["polars-core/strings", "polars-lazy/strings"]

# support for ObjectChunked<T> (downcastable Series of any type)
object = ["polars-core/object", "polars-lazy/object"]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ csv-file = ["polars-io/csv-file"]
temporal = ["polars-core/temporal"]
# debugging purposesses
fmt = ["polars-core/plain_fmt"]
strings = ["polars-core/strings"]
future = []
dtype-u8 = ["polars-core/dtype-u8"]
dtype-u16 = ["polars-core/dtype-u16"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
//! Domain specific language for the Lazy api.
#[cfg(feature = "strings")]
pub mod string;

use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{has_expr, has_root_literal_expr};
Expand Down Expand Up @@ -1885,19 +1888,6 @@ impl Expr {
)
.with_fmt("kurtosis")
}
#[cfg(feature = "concat_str")]
/// Concat the values into a string array.
/// # Arguments
///
/// * `delimiter` - A string that will act as delimiter between values.
pub fn str_concat(self, delimiter: &str) -> Expr {
let delimiter = delimiter.to_owned();
self.apply(
move |s| Ok(s.str_concat(&delimiter).into_series()),
GetOutput::from_type(DataType::Utf8),
)
.with_fmt("str_concat")
}

/// Get maximal value that could be hold by this dtype.
pub fn upper_bound(self) -> Expr {
Expand Down Expand Up @@ -2091,6 +2081,11 @@ impl Expr {
)
.with_fmt("all")
}

#[cfg(feature = "strings")]
pub fn str(self) -> string::StringNameSpace {
string::StringNameSpace(self)
}
}

// Arithmetic ops
Expand Down
43 changes: 43 additions & 0 deletions polars/polars-lazy/src/dsl/string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use super::*;

pub struct StringNameSpace(pub(crate) Expr);

impl StringNameSpace {
pub fn extract(self, pat: &str, group_index: usize) -> Expr {
let pat = pat.to_string();
let function = move |s: Series| {
let ca = s.utf8()?;
match ca.extract(&pat, group_index) {
Ok(ca) => Ok(ca.into_series()),
Err(e) => Err(PolarsError::ComputeError(format!("{:?}", e).into())),
}
};
self.0
.map(function, GetOutput::from_type(DataType::Utf8))
.with_fmt("str.extract")
}

#[cfg(feature = "temporal")]
pub fn strftime(self, fmt: &str) -> Expr {
let fmt = fmt.to_string();
let function = move |s: Series| s.strftime(&fmt);
self.0
.map(function, GetOutput::from_type(DataType::Utf8))
.with_fmt("strftime")
}

#[cfg(feature = "concat_str")]
/// Concat the values into a string array.
/// # Arguments
///
/// * `delimiter` - A string that will act as delimiter between values.
pub fn concat(self, delimiter: &str) -> Expr {
let delimiter = delimiter.to_owned();
self.0
.apply(
move |s| Ok(s.str_concat(&delimiter).into_series()),
GetOutput::from_type(DataType::Utf8),
)
.with_fmt("str_concat")
}
}
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ pub fn col(name: &str) -> Expr {
}
}

/// Selects all columns
pub fn all() -> Expr {
Expr::Wildcard
}

/// Select multiple columns by name
pub fn cols(names: Vec<String>) -> Expr {
Expr::Columns(names)
Expand Down
94 changes: 47 additions & 47 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,73 +395,73 @@ pub(crate) fn to_alp(
lp_arena.add(v)
}

pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
pub(crate) fn node_to_expr(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
let expr = expr_arena.get(node).clone();

match expr {
AExpr::Duplicated(node) => Expr::Duplicated(Box::new(node_to_exp(node, expr_arena))),
AExpr::IsUnique(node) => Expr::IsUnique(Box::new(node_to_exp(node, expr_arena))),
AExpr::Reverse(node) => Expr::Reverse(Box::new(node_to_exp(node, expr_arena))),
AExpr::Explode(node) => Expr::Explode(Box::new(node_to_exp(node, expr_arena))),
AExpr::Duplicated(node) => Expr::Duplicated(Box::new(node_to_expr(node, expr_arena))),
AExpr::IsUnique(node) => Expr::IsUnique(Box::new(node_to_expr(node, expr_arena))),
AExpr::Reverse(node) => Expr::Reverse(Box::new(node_to_expr(node, expr_arena))),
AExpr::Explode(node) => Expr::Explode(Box::new(node_to_expr(node, expr_arena))),
AExpr::Alias(expr, name) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
Expr::Alias(Box::new(exp), name)
}
AExpr::Column(a) => Expr::Column(a),
AExpr::Literal(s) => Expr::Literal(s),
AExpr::BinaryExpr { left, op, right } => {
let l = node_to_exp(left, expr_arena);
let r = node_to_exp(right, expr_arena);
let l = node_to_expr(left, expr_arena);
let r = node_to_expr(right, expr_arena);
Expr::BinaryExpr {
left: Box::new(l),
op,
right: Box::new(r),
}
}
AExpr::Not(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
Expr::Not(Box::new(exp))
}
AExpr::IsNotNull(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
Expr::IsNotNull(Box::new(exp))
}
AExpr::IsNull(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
Expr::IsNull(Box::new(exp))
}
AExpr::Cast {
expr,
data_type,
strict,
} => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
Expr::Cast {
expr: Box::new(exp),
data_type,
strict,
}
}
AExpr::Sort { expr, options } => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
Expr::Sort {
expr: Box::new(exp),
options,
}
}
AExpr::Take { expr, idx } => {
let expr = node_to_exp(expr, expr_arena);
let idx = node_to_exp(idx, expr_arena);
let expr = node_to_expr(expr, expr_arena);
let idx = node_to_expr(idx, expr_arena);
Expr::Take {
expr: Box::new(expr),
idx: Box::new(idx),
}
}
AExpr::SortBy { expr, by, reverse } => {
let expr = node_to_exp(expr, expr_arena);
let expr = node_to_expr(expr, expr_arena);
let by = by
.iter()
.map(|node| node_to_exp(*node, expr_arena))
.map(|node| node_to_expr(*node, expr_arena))
.collect();
Expr::SortBy {
expr: Box::new(expr),
Expand All @@ -470,53 +470,53 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
}
}
AExpr::Filter { input, by } => {
let input = node_to_exp(input, expr_arena);
let by = node_to_exp(by, expr_arena);
let input = node_to_expr(input, expr_arena);
let by = node_to_expr(by, expr_arena);
Expr::Filter {
input: Box::new(input),
by: Box::new(by),
}
}
AExpr::Agg(agg) => match agg {
AAggExpr::Min(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Min(Box::new(exp)).into()
}
AAggExpr::Max(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Max(Box::new(exp)).into()
}

AAggExpr::Median(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Median(Box::new(exp)).into()
}
AAggExpr::NUnique(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::NUnique(Box::new(exp)).into()
}
AAggExpr::First(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::First(Box::new(exp)).into()
}
AAggExpr::Last(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Last(Box::new(exp)).into()
}
AAggExpr::Mean(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Mean(Box::new(exp)).into()
}
AAggExpr::List(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::List(Box::new(exp)).into()
}
AAggExpr::Quantile {
expr,
quantile,
interpol,
} => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Quantile {
expr: Box::new(exp),
quantile,
Expand All @@ -525,28 +525,28 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
.into()
}
AAggExpr::Sum(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Sum(Box::new(exp)).into()
}
AAggExpr::Std(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Std(Box::new(exp)).into()
}
AAggExpr::Var(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Var(Box::new(exp)).into()
}
AAggExpr::AggGroups(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::AggGroups(Box::new(exp)).into()
}
AAggExpr::Count(expr) => {
let exp = node_to_exp(expr, expr_arena);
let exp = node_to_expr(expr, expr_arena);
AggExpr::Count(Box::new(exp)).into()
}
},
AExpr::Shift { input, periods } => {
let e = node_to_exp(input, expr_arena);
let e = node_to_expr(input, expr_arena);
Expr::Shift {
input: Box::new(e),
periods,
Expand All @@ -557,9 +557,9 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
truthy,
falsy,
} => {
let p = node_to_exp(predicate, expr_arena);
let t = node_to_exp(truthy, expr_arena);
let f = node_to_exp(falsy, expr_arena);
let p = node_to_expr(predicate, expr_arena);
let t = node_to_expr(truthy, expr_arena);
let f = node_to_expr(falsy, expr_arena);

Expr::Ternary {
predicate: Box::new(p),
Expand All @@ -584,9 +584,9 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
order_by,
options,
} => {
let function = Box::new(node_to_exp(function, expr_arena));
let function = Box::new(node_to_expr(function, expr_arena));
let partition_by = nodes_to_exprs(&partition_by, expr_arena);
let order_by = order_by.map(|ob| Box::new(node_to_exp(ob, expr_arena)));
let order_by = order_by.map(|ob| Box::new(node_to_expr(ob, expr_arena)));
Expr::Window {
function,
partition_by,
Expand All @@ -599,7 +599,7 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
offset,
length,
} => Expr::Slice {
input: Box::new(node_to_exp(input, expr_arena)),
input: Box::new(node_to_expr(input, expr_arena)),
offset,
length,
},
Expand All @@ -608,7 +608,7 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
}

fn nodes_to_exprs(nodes: &[Node], expr_arena: &Arena<AExpr>) -> Vec<Expr> {
nodes.iter().map(|n| node_to_exp(*n, expr_arena)).collect()
nodes.iter().map(|n| node_to_expr(*n, expr_arena)).collect()
}

pub(crate) fn node_to_lp(
Expand Down Expand Up @@ -637,7 +637,7 @@ pub(crate) fn node_to_lp(
}
ALogicalPlan::Selection { input, predicate } => {
let lp = node_to_lp(input, expr_arena, lp_arena);
let p = node_to_exp(predicate, expr_arena);
let p = node_to_expr(predicate, expr_arena);
LogicalPlan::Selection {
input: Box::new(lp),
predicate: p,
Expand All @@ -655,7 +655,7 @@ pub(crate) fn node_to_lp(
path,
schema,
options,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
predicate: predicate.map(|n| node_to_expr(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
},
#[cfg(feature = "ipc")]
Expand All @@ -669,7 +669,7 @@ pub(crate) fn node_to_lp(
} => LogicalPlan::IpcScan {
path,
schema,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
predicate: predicate.map(|n| node_to_expr(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
options,
},
Expand All @@ -684,7 +684,7 @@ pub(crate) fn node_to_lp(
} => LogicalPlan::ParquetScan {
path,
schema,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
predicate: predicate.map(|n| node_to_expr(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
options,
},
Expand All @@ -698,8 +698,8 @@ pub(crate) fn node_to_lp(
schema,
projection: projection
.as_ref()
.map(|nodes| nodes.iter().map(|n| node_to_exp(*n, expr_arena)).collect()),
selection: selection.map(|n| node_to_exp(n, expr_arena)),
.map(|nodes| nodes.iter().map(|n| node_to_expr(*n, expr_arena)).collect()),
selection: selection.map(|n| node_to_expr(n, expr_arena)),
},
ALogicalPlan::Projection {
expr,
Expand Down

0 comments on commit 6c6274b

Please sign in to comment.