Skip to content

Commit

Permalink
add lazy sort
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent 5287df8 commit c62a262
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 65 deletions.
69 changes: 69 additions & 0 deletions polars/src/lazy/frame.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::frame::select::Selection;
use crate::{lazy::prelude::*, prelude::*};

impl DataFrame {
/// Convert the `DataFrame` into a lazy `DataFrame`
pub fn lazy(self) -> LazyFrame {
LogicalPlanBuilder::from_existing_df(self).build().into()
}
}

/// abstraction over a logical plan
pub struct LazyFrame {
logical_plan: LogicalPlan,
}

impl From<LogicalPlan> for LazyFrame {
fn from(plan: LogicalPlan) -> Self {
Self { logical_plan: plan }
}
}

impl LazyFrame {
fn get_plan_builder(self) -> LogicalPlanBuilder {
LogicalPlanBuilder::from(self.logical_plan)
}

pub fn select<'a, K, S: Selection<'a, K>>(self, columns: S) -> Self {
let expr = columns
.to_selection_vec()
.into_iter()
.map(|s| col(s))
.collect::<Vec<_>>();
self.get_plan_builder().project(expr).build().into()
}

pub fn sort(self, by_column: &str, reverse: bool) -> Self {
let expr = vec![Expr::Sort {
reverse,
expr: Box::new(col(by_column)),
}];
self.get_plan_builder().sort(expr).build().into()
}

pub fn collect(self) -> Result<DataFrame> {
let logical_plan = self.get_plan_builder().build();
// todo: optimize plan.

let planner = DefaultPlanner::default();
let physical_plan = planner.create_physical_plan(&logical_plan)?;
physical_plan.execute()
}
}

#[cfg(test)]
mod test {
use crate::lazy::tests::get_df;

#[test]
fn test_lazy_exec() {
let df = get_df();
let new = df
.lazy()
.select("sepal.width")
.sort("sepal.width", false)
.collect();

println!("{:?}", new)
}
}
38 changes: 21 additions & 17 deletions polars/src/lazy/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::frame::select::Selection;
use crate::prelude::*;
use arrow::datatypes::SchemaRef;
use std::cell::RefCell;
Expand Down Expand Up @@ -109,7 +108,7 @@ fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
pub enum LogicalPlan {
Filter {
predicate: Expr,
input: Rc<LogicalPlan>,
input: Box<LogicalPlan>,
},
CsvScan {
path: String,
Expand All @@ -122,8 +121,12 @@ pub enum LogicalPlan {
},
// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
Projection {
columns: Rc<Vec<Expr>>,
input: Rc<LogicalPlan>,
expr: Vec<Expr>,
input: Box<LogicalPlan>,
},
Sort {
input: Box<LogicalPlan>,
expr: Vec<Expr>,
},
}

Expand Down Expand Up @@ -151,26 +154,19 @@ impl LogicalPlanBuilder {
.into()
}

/// Projection in RDMS language
pub fn select<'a, K, S: Selection<'a, K>>(&self, columns: S) -> Self {
let columns = columns
.to_selection_vec()
.into_iter()
.map(|s| col(s))
.collect::<Vec<_>>();

pub fn project(self, expr: Vec<Expr>) -> Self {
LogicalPlan::Projection {
columns: Rc::new(columns),
input: Rc::new(self.0.clone()),
expr,
input: Box::new(self.0),
}
.into()
}

/// Apply a filter
pub fn filter(&self, predicate: Expr) -> Self {
pub fn filter(self, predicate: Expr) -> Self {
LogicalPlan::Filter {
predicate,
input: Rc::new(self.0.clone()),
input: Box::new(self.0),
}
.into()
}
Expand All @@ -179,12 +175,20 @@ impl LogicalPlanBuilder {
self.0
}

pub fn dataframe(df: DataFrame) -> Self {
pub fn from_existing_df(df: DataFrame) -> Self {
LogicalPlan::DataFrameScan {
df: Rc::new(RefCell::new(df)),
}
.into()
}

pub fn sort(self, expr: Vec<Expr>) -> Self {
LogicalPlan::Sort {
input: Box::new(self.0),
expr,
}
.into()
}
}

/// Create a column expression based on a column name.
Expand Down
26 changes: 13 additions & 13 deletions polars/src/lazy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
// All credits to Andy Grove and Ballista/ DataFusion / Apache Arrow

pub mod frame;
mod logical_plan;
mod physical_plan;

pub(crate) use crate::{
lazy::{logical_plan::*, physical_plan::expressions::*},
prelude::*,
};
use arrow::datatypes::SchemaRef;
pub(crate) mod prelude;

#[cfg(test)]
mod tests {
use super::*;
use crate::lazy::physical_plan::{planner::SimplePlanner, PhysicalPlanner};
use crate::lazy::prelude::*;
use crate::prelude::*;
use std::io::Cursor;

// physical plan see: datafusion/physical_plan/planner.rs.html#61-63

#[test]
fn plan_builder_simple() {
pub(crate) fn get_df() -> DataFrame {
let s = r#"
"sepal.length","sepal.width","petal.length","petal.width","variety"
5.1,3.5,1.4,.2,"Setosa"
Expand All @@ -38,15 +33,20 @@ mod tests {
.has_header(true)
.finish()
.unwrap();
df
}

#[test]
fn plan_builder_simple() {
let df = get_df();

let logical_plan = LogicalPlanBuilder::dataframe(df)
let logical_plan = LogicalPlanBuilder::from_existing_df(df)
.filter(col("sepal.length").lt(lit(5)))
.select(&["sepal.length", "variety"])
.build();

println!("{:?}", logical_plan);

let planner = SimplePlanner {};
let planner = DefaultPlanner {};
let physical_plan = planner.create_physical_plan(&logical_plan).unwrap();
println!("{:?}", physical_plan.execute());
}
Expand Down
40 changes: 25 additions & 15 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use arrow::datatypes::SchemaRef;
use std::cell::RefCell;
use std::mem;

Expand Down Expand Up @@ -27,7 +26,7 @@ impl CsvExec {
}
}

impl ExecutionPlan for CsvExec {
impl Executor for CsvExec {
fn execute(&self) -> Result<DataFrame> {
let file = std::fs::File::open(&self.path).unwrap();

Expand All @@ -42,16 +41,16 @@ impl ExecutionPlan for CsvExec {
#[derive(Debug)]
pub struct FilterExec {
predicate: Rc<dyn PhysicalExpr>,
input: Rc<dyn ExecutionPlan>,
input: Rc<dyn Executor>,
}

impl FilterExec {
pub fn new(predicate: Rc<dyn PhysicalExpr>, input: Rc<dyn ExecutionPlan>) -> Self {
pub fn new(predicate: Rc<dyn PhysicalExpr>, input: Rc<dyn Executor>) -> Self {
Self { predicate, input }
}
}

impl ExecutionPlan for FilterExec {
impl Executor for FilterExec {
fn execute(&self) -> Result<DataFrame> {
let df = self.input.execute()?;
let s = self.predicate.evaluate(&df)?;
Expand All @@ -72,7 +71,7 @@ impl DataFrameExec {
}
}

impl ExecutionPlan for DataFrameExec {
impl Executor for DataFrameExec {
fn execute(&self) -> Result<DataFrame> {
let mut ref_df = self.df.borrow_mut();
let df = &mut *ref_df;
Expand All @@ -81,24 +80,35 @@ impl ExecutionPlan for DataFrameExec {
}
}

/// Take an input Executor and a multiple expressions
#[derive(Debug)]
pub struct ProjectionExec {
input: Rc<dyn ExecutionPlan>,
columns: Vec<Rc<dyn PhysicalExpr>>,
pub struct PipeExec {
/// i.e. sort, projection
operation: &'static str,
input: Rc<dyn Executor>,
expr: Vec<Rc<dyn PhysicalExpr>>,
}

impl ProjectionExec {
pub(crate) fn new(input: Rc<dyn ExecutionPlan>, columns: Vec<Rc<dyn PhysicalExpr>>) -> Self {
Self { input, columns }
impl PipeExec {
pub(crate) fn new(
operation: &'static str,
input: Rc<dyn Executor>,
expr: Vec<Rc<dyn PhysicalExpr>>,
) -> Self {
Self {
operation,
input,
expr,
}
}
}

impl ExecutionPlan for ProjectionExec {
impl Executor for PipeExec {
fn execute(&self) -> Result<DataFrame> {
// projection is only on a DataFrame so we unpack df
let df = self.input.execute()?;

let selected_columns = self
.columns
.expr
.iter()
.map(|expr| expr.evaluate(&df))
.collect::<Result<Vec<Series>>>()?;
Expand Down
21 changes: 20 additions & 1 deletion polars/src/lazy/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::*;
use crate::{lazy::prelude::*, prelude::*};
use std::rc::Rc;

#[derive(Debug)]
Expand Down Expand Up @@ -74,3 +74,22 @@ impl PhysicalExpr for ColumnExpr {
Ok(column.clone())
}
}

#[derive(Debug)]
pub struct SortExpr {
expr: Rc<dyn PhysicalExpr>,
reverse: bool,
}

impl SortExpr {
pub fn new(expr: Rc<dyn PhysicalExpr>, reverse: bool) -> Self {
Self { expr, reverse }
}
}

impl PhysicalExpr for SortExpr {
fn evaluate(&self, df: &DataFrame) -> Result<Series> {
let series = self.expr.evaluate(df)?;
Ok(series.sort(self.reverse))
}
}
10 changes: 7 additions & 3 deletions polars/src/lazy/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ pub mod executors;
pub mod expressions;
pub mod planner;

use super::*;
use crate::{lazy::prelude::*, prelude::*};
use arrow::datatypes::SchemaRef;
use std::fmt::Debug;
use std::rc::Rc;

Expand All @@ -12,20 +13,23 @@ pub enum ExprVal {
}

pub trait PhysicalPlanner {
fn create_physical_plan(&self, logical_plan: &LogicalPlan) -> Result<Rc<dyn ExecutionPlan>>;
fn create_physical_plan(&self, logical_plan: &LogicalPlan) -> Result<Rc<dyn Executor>>;
}

pub trait ExecutionPlan: Debug {
/// Executors will evaluate physical expressions and collect them in a DataFrame.
pub trait Executor: Debug {
fn schema(&self) -> SchemaRef {
todo!()
}
fn execute(&self) -> Result<DataFrame>;
}

/// Take a DataFrame and evaluate the expressions.
/// Implement this for Column, lt, eq, etc
pub trait PhysicalExpr: Debug {
fn data_type(&self, _input_schema: &Schema) -> Result<ArrowDataType> {
unimplemented!()
}
/// Take a DataFrame and evaluate the expression.
fn evaluate(&self, df: &DataFrame) -> Result<Series>;
}

0 comments on commit c62a262

Please sign in to comment.