Skip to content

Commit

Permalink
[Lazy] HStack
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 8, 2020
1 parent 1fb099d commit de15892
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 21 deletions.
8 changes: 8 additions & 0 deletions polars/src/lazy/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,14 @@ mod test {
println!("{:?}", df);
assert_eq!(df.width(), 6);
assert!(df.column("foo").is_ok());

let df = get_df()
.lazy()
.with_column(lit(10).alias("foo"))
.select(&[col("foo"), col("sepal.width")])
.collect()
.unwrap();
println!("{:?}", df);
}

#[test]
Expand Down
22 changes: 14 additions & 8 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ pub enum LogicalPlan {
left_on: Arc<String>,
right_on: Arc<String>,
},
HStack {
input: Box<LogicalPlan>,
exprs: Vec<Expr>,
schema: Schema,
},
}

impl Default for LogicalPlan {
Expand Down Expand Up @@ -145,6 +150,7 @@ impl fmt::Debug for LogicalPlan {
"JOIN\n\t({:?})\nWITH\n\t({:?})\nON (left: {} right: {})",
input_left, input_right, left_on, right_on
),
HStack { input, exprs, .. } => write!(f, "\n{:?} WITH COLUMN(S) {:?}\n", input, exprs),
}
}
}
Expand All @@ -162,6 +168,7 @@ impl LogicalPlan {
Sort { input, .. } => input.schema(),
Aggregate { schema, .. } => schema,
Join { schema, .. } => schema,
HStack { schema, .. } => schema,
}
}
pub fn describe(&self) -> String {
Expand Down Expand Up @@ -194,16 +201,15 @@ impl LogicalPlanBuilder {
// current schema
let schema = self.0.schema();

let current_columns = schema.fields();
let mut selection = Vec::with_capacity(current_columns.len() + exprs.len());
let added_schema = utils::expressions_to_schema(&exprs, schema);
let new_schema = Schema::try_merge(&[schema.clone(), added_schema]).unwrap();

for column in current_columns {
selection.push(col(column.name()));
}
for expr in exprs {
selection.push(expr);
LogicalPlan::HStack {
input: Box::new(self.0),
exprs,
schema: new_schema,
}
self.project(selection)
.into()
}

/// Apply a filter
Expand Down
5 changes: 5 additions & 0 deletions polars/src/lazy/logical_plan/optimizer/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ impl PredicatePushDown {
LogicalPlanBuilder::from(lp_left).join(lp_right, how, left_on, right_on);
self.finish_node(local_predicates, builder)
}
HStack { input, exprs, .. } => Ok(LogicalPlanBuilder::from(
self.push_down(*input, acc_predicates)?,
)
.with_columns(exprs)
.build()),
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion polars/src/lazy/logical_plan/optimizer/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl ProjectionPushDown {
// Every non projection operation we recurse and rebuild that operation on the output of the recursion.
// The recursion stops at the nodes of the logical plan. These nodes IO or existing DataFrames. On top of
// these nodes we apply the projection.
// TODO: renaming operations and joins interfere with the schema. We need to keep track of the schema somehow.
fn push_down(
&self,
logical_plan: LogicalPlan,
Expand Down Expand Up @@ -221,6 +220,18 @@ impl ProjectionPushDown {
LogicalPlanBuilder::from(lp_left).join(lp_right, how, left_on, right_on);
self.finish_node(local_projection, builder)
}
HStack { input, exprs, .. } => {
// todo! could just keep down if we accept a different order of the schema
let (mut acc_projections, mut local_projections) =
self.split_acc_projections(acc_projections, input.schema());

let builder =
LogicalPlanBuilder::from(self.push_down(*input, acc_projections.clone())?)
.with_columns(exprs);
// locally re-project all columns plus the stacked columns to keep the order of the schema equal
local_projections.append(&mut acc_projections);
self.finish_node(local_projections, builder)
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions polars/src/lazy/logical_plan/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ impl TypeCoercion {
right_on,
})
}
HStack { input, exprs, .. } => Ok(LogicalPlanBuilder::from(self.coerce(*input)?)
.with_columns(exprs)
.build()),
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,38 @@ impl Executor for JoinExec {
}
}
}
#[derive(Debug)]
pub struct StackExec {
input: Arc<dyn Executor>,
expr: Vec<Arc<dyn PhysicalExpr>>,
}

impl StackExec {
pub(crate) fn new(input: Arc<dyn Executor>, expr: Vec<Arc<dyn PhysicalExpr>>) -> Self {
Self { input, expr }
}
}

impl Executor for StackExec {
fn execute(&self) -> Result<DataFrame> {
let mut df = self.input.execute()?;
let height = df.height();

let added_columns = self
.expr
.iter()
.map(|expr| {
expr.evaluate(&df).map(|series| {
// literal series. Should be whole column size
if series.len() == 1 && height > 1 {
series.expand_at_index(height, 0)
} else {
series
}
})
})
.collect::<Result<Vec<Series>>>()?;
df.hstack(&added_columns)?;
Ok(df)
}
}
20 changes: 11 additions & 9 deletions polars/src/lazy/physical_plan/planner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::lazy::physical_plan::executors::JoinExec;
use crate::lazy::physical_plan::executors::{JoinExec, StackExec};
use crate::{lazy::prelude::*, prelude::*};
use std::sync::Arc;

Expand All @@ -16,6 +16,9 @@ impl PhysicalPlanner for DefaultPlanner {
}

impl DefaultPlanner {
fn create_physical_expressions(&self, exprs: &[Expr]) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
exprs.iter().map(|e| self.create_physical_expr(e)).collect()
}
pub fn create_initial_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand All @@ -39,10 +42,7 @@ impl DefaultPlanner {
))),
LogicalPlan::Projection { expr, input, .. } => {
let input = self.create_initial_physical_plan(input)?;
let phys_expr = expr
.iter()
.map(|expr| self.create_physical_expr(expr))
.collect::<Result<Vec<_>>>()?;
let phys_expr = self.create_physical_expressions(expr)?;
Ok(Arc::new(PipeExec::new("projection", input, phys_expr)))
}
LogicalPlan::DataFrameScan { df, .. } => Ok(Arc::new(DataFrameExec::new(df.clone()))),
Expand All @@ -60,10 +60,7 @@ impl DefaultPlanner {
input, keys, aggs, ..
} => {
let input = self.create_initial_physical_plan(input)?;
let phys_aggs = aggs
.iter()
.map(|e| self.create_physical_expr(e))
.collect::<Result<Vec<_>>>()?;
let phys_aggs = self.create_physical_expressions(aggs)?;
Ok(Arc::new(GroupByExec::new(input, keys.clone(), phys_aggs)))
}
LogicalPlan::Join {
Expand All @@ -84,6 +81,11 @@ impl DefaultPlanner {
right_on.clone(),
)))
}
LogicalPlan::HStack { input, exprs, .. } => {
let input = self.create_initial_physical_plan(input)?;
let phys_expr = self.create_physical_expressions(exprs)?;
Ok(Arc::new(StackExec::new(input, phys_expr)))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "py-polars"
version = "0.0.2"
version = "0.0.3"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2018"

Expand Down
3 changes: 3 additions & 0 deletions py-polars/pypolars/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
from .series import Series
from .frame import DataFrame

# needed for side effects
from pypolars.lazy import *
6 changes: 6 additions & 0 deletions py-polars/pypolars/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,9 @@ def join(
else:
return NotImplemented
return wrap_ldf(inner)

def with_columns(self, exprs: List[PyExpr]) -> LazyFrame:
return wrap_ldf(self._ldf.with_columns(exprs))

def with_column(self, expr: PyExpr) -> LazyFrame:
return self.with_columns([expr])
22 changes: 21 additions & 1 deletion py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use crate::{
series::PySeries,
};
use polars::lazy::dsl;
use polars::lazy::dsl::Operator;
use pyo3::prelude::*;
use pyo3::types::{PyFloat, PyInt};
use pyo3::wrap_pyfunction;
use pyo3::{wrap_pyfunction, PyNumberProtocol};

pub mod dataframe;
pub mod datatypes;
Expand All @@ -26,6 +27,22 @@ pub struct PyExpr {
pub inner: dsl::Expr,
}

#[pyproto]
impl PyNumberProtocol for PyExpr {
fn __add__(lhs: Self, rhs: Self) -> PyResult<PyExpr> {
Ok(dsl::binary_expr(lhs.inner, Operator::Plus, rhs.inner).into())
}
fn __sub__(lhs: Self, rhs: Self) -> PyResult<PyExpr> {
Ok(dsl::binary_expr(lhs.inner, Operator::Minus, rhs.inner).into())
}
fn __mul__(lhs: Self, rhs: Self) -> PyResult<PyExpr> {
Ok(dsl::binary_expr(lhs.inner, Operator::Multiply, rhs.inner).into())
}
fn __truediv__(lhs: Self, rhs: Self) -> PyResult<PyExpr> {
Ok(dsl::binary_expr(lhs.inner, Operator::Divide, rhs.inner).into())
}
}

#[pymethods]
impl PyExpr {
pub fn eq(&self, other: PyExpr) -> PyExpr {
Expand All @@ -46,6 +63,9 @@ impl PyExpr {
pub fn lt(&self, other: PyExpr) -> PyExpr {
self.clone().inner.lt(other.inner).into()
}
pub fn alias(&self, name: &str) -> PyExpr {
self.clone().inner.alias(name).into()
}
pub fn not(&self) -> PyExpr {
self.clone().inner.not().into()
}
Expand Down
2 changes: 1 addition & 1 deletion py-polars/tests/test_lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
def test_lazy():
df = DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
print(df)
ldf = df.lazy().select([col("a")])
ldf = df.lazy().with_column(lit(1).alias("foo")).select([col("a"), col("foo")])

print(ldf.collect())

0 comments on commit de15892

Please sign in to comment.