Skip to content

Commit

Permalink
[lazy] take expression
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 28, 2021
1 parent 3c32fb4 commit 9086ef3
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 6 deletions.
15 changes: 15 additions & 0 deletions polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ pub enum Expr {
expr: Box<Expr>,
reverse: bool,
},
Take {
expr: Box<Expr>,
idx: Box<Expr>,
},
SortBy {
expr: Box<Expr>,
by: Box<Expr>,
Expand Down Expand Up @@ -277,6 +281,9 @@ impl fmt::Debug for Expr {
Filter { input, by } => {
write!(f, "FILTER {:?} BY {:?}", input, by)
}
Take { expr, idx } => {
write!(f, "TAKE {:?} AT {:?}", expr, idx)
}
Agg(agg) => {
use AggExpr::*;
match agg {
Expand Down Expand Up @@ -576,6 +583,14 @@ impl Expr {
}
}

/// Take the values by idx.
pub fn take(self, idx: Expr) -> Self {
Expr::Take {
expr: Box::new(self),
idx: Box::new(idx),
}
}

/// Sort in increasing order. See [the eager implementation](polars_core::series::SeriesTrait::sort).
///
/// Can be used in `default` and `aggregation` context.
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/logical_plan/aexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub enum AExpr {
expr: Node,
reverse: bool,
},
Take {
expr: Node,
idx: Node,
},
SortBy {
expr: Node,
by: Node,
Expand Down Expand Up @@ -175,6 +179,7 @@ impl AExpr {
IsNull(_) => Ok(Field::new("is_null", DataType::Boolean)),
IsNotNull(_) => Ok(Field::new("is_not_null", DataType::Boolean)),
Sort { expr, .. } => arena.get(*expr).to_field(schema, ctxt, arena),
Take { expr, .. } => arena.get(*expr).to_field(schema, ctxt, arena),
SortBy { expr, .. } => arena.get(*expr).to_field(schema, ctxt, arena),
Filter { input, .. } => arena.get(*input).to_field(schema, ctxt, arena),
Agg(agg) => {
Expand Down
3 changes: 0 additions & 3 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,6 @@ impl<'a> ALogicalPlanBuilder<'a> {
self
}
}
pub fn into_node(self) -> Node {
self.root
}

pub fn build(self) -> ALogicalPlan {
if self.root.0 == self.lp_arena.len() {
Expand Down
12 changes: 12 additions & 0 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub(crate) fn to_aexpr(expr: Expr, arena: &mut Arena<AExpr>) -> Node {
expr: to_aexpr(*expr, arena),
data_type,
},
Expr::Take { expr, idx } => AExpr::Take {
expr: to_aexpr(*expr, arena),
idx: to_aexpr(*idx, arena),
},
Expr::Sort { expr, reverse } => AExpr::Sort {
expr: to_aexpr(*expr, arena),
reverse,
Expand Down Expand Up @@ -413,6 +417,14 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
reverse,
}
}
AExpr::Take { expr, idx } => {
let expr = node_to_exp(expr, expr_arena);
let idx = node_to_exp(idx, expr_arena);
Expr::Take {
expr: Box::new(expr),
idx: Box::new(idx),
}
}
AExpr::SortBy { expr, by, reverse } => {
let expr = node_to_exp(expr, expr_arena);
let by = node_to_exp(by, expr_arena);
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-lazy/src/logical_plan/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl<'a> Iterator for ExprIter<'a> {
IsNotNull(e) => push(e),
Cast { expr, .. } => push(expr),
Sort { expr, .. } => push(expr),
Take { expr, idx } => {
push(expr);
push(idx);
}
Filter { input, by } => {
push(input);
push(by)
Expand Down Expand Up @@ -120,6 +124,10 @@ impl AExpr {
IsNotNull(e) => push(e),
Cast { expr, .. } => push(expr),
Sort { expr, .. } => push(expr),
Take { expr, idx } => {
push(expr);
push(idx);
}
SortBy { expr, by, .. } => {
push(expr);
push(by);
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,10 @@ fn replace_wildcard_with_column(expr: Expr, column_name: Arc<String>) -> Expr {
Expr::Explode(expr) => {
Expr::Explode(Box::new(replace_wildcard_with_column(*expr, column_name)))
}
Expr::Take { expr, idx } => Expr::Take {
expr: Box::new(replace_wildcard_with_column(*expr, column_name)),
idx,
},
Expr::Ternary {
predicate,
truthy,
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod default;
mod final_agg;
pub(crate) mod take;

use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
Expand Down
29 changes: 29 additions & 0 deletions polars/polars-lazy/src/physical_plan/expressions/take.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::prelude::*;
use std::sync::Arc;

pub struct TakeExpr {
pub(crate) expr: Arc<dyn PhysicalExpr>,
pub(crate) idx: Arc<dyn PhysicalExpr>,
}

impl TakeExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, idx: Arc<dyn PhysicalExpr>) -> Self {
Self { expr, idx }
}
}

impl PhysicalExpr for TakeExpr {
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let series = self.expr.evaluate(df, state)?;
let idx = self.idx.evaluate(df, state)?;
let idx_ca = idx.u32()?;

Ok(series.take(idx_ca))
}

fn to_field(&self, input_schema: &Schema) -> Result<Field> {
self.expr.to_field(input_schema)
}
}
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ impl DefaultPlanner {
node_to_exp(expression, expr_arena),
)))
}
Take { expr, idx } => {
let phys_expr = self.create_physical_expr(expr, ctxt, expr_arena)?;
let phys_idx = self.create_physical_expr(idx, ctxt, expr_arena)?;
Ok(Arc::new(TakeExpr::new(phys_expr, phys_idx)))
}
SortBy { expr, by, reverse } => {
let phys_expr = self.create_physical_expr(expr, ctxt, expr_arena)?;
let phys_by = self.create_physical_expr(by, ctxt, expr_arena)?;
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
pub use polars_core::utils::{Arena, Node};

pub use crate::logical_plan::aexpr::*;
pub use crate::logical_plan::alp::*;
pub(crate) use crate::logical_plan::conversion::*;
pub use crate::{
dsl::*,
frame::*,
Expand All @@ -17,3 +14,7 @@ pub use crate::{
Executor, PhysicalPlanner,
},
};
pub(crate) use crate::{
logical_plan::{aexpr::*, alp::*, conversion::*},
physical_plan::expressions::take::TakeExpr,
};
15 changes: 15 additions & 0 deletions py-polars/polars/lazy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,21 @@ def sort_by(self, by: "Union[Expr, str]", reverse: bool = False) -> "Expr":

return wrap_expr(self._pyexpr.sort_by(by._pyexpr, reverse))

def take(self, index: "Expr") -> "Expr":
"""
Take values by index.
Parameters
----------
index
An expression that leads to a UInt32 dtyped Series.
Returns
-------
Values taken by index
"""
return wrap_expr(self._pyexpr.take(index._pyexpr))

def shift(self, periods: int) -> "Expr":
"""
Shift the values by a given period and fill the parts that will be empty due to this operation
Expand Down
4 changes: 4 additions & 0 deletions py-polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ impl PyExpr {
self.clone().inner.sort(reverse).into()
}

pub fn take(&self, idx: PyExpr) -> PyExpr {
self.clone().inner.take(idx.inner).into()
}

pub fn sort_by(&self, by: PyExpr, reverse: bool) -> PyExpr {
self.clone().inner.sort_by(by.inner, reverse).into()
}
Expand Down

0 comments on commit 9086ef3

Please sign in to comment.