Skip to content

Commit

Permalink
add select projection to lazy process
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent aa8a51a commit c2c8da3
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 67 deletions.
2 changes: 1 addition & 1 deletion polars/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl DataFrame {
}

// doesn't check Series sizes.
fn new_no_checks(columns: Vec<Series>) -> DataFrame {
pub(crate) fn new_no_checks(columns: Vec<Series>) -> DataFrame {
DataFrame {
columns,
parallel: false,
Expand Down
21 changes: 21 additions & 0 deletions polars/src/lazy/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::frame::select::Selection;
use crate::prelude::*;
use arrow::datatypes::SchemaRef;
use std::cell::RefCell;
Expand Down Expand Up @@ -139,6 +140,11 @@ pub enum LogicalPlan {
DataFrameScan {
df: Rc<RefCell<DataFrame>>,
},
// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
Projection {
columns: Rc<Vec<Expr>>,
input: Rc<LogicalPlan>,
},
}

pub struct LogicalPlanBuilder(LogicalPlan);
Expand All @@ -165,6 +171,21 @@ impl LogicalPlanBuilder {
.into()
}

/// Projection in RDMS language
pub fn select<'a, K, S: Selection<'a, K>>(&self, columns: S) -> Self {
let columns = columns
.to_selection_vec()
.into_iter()
.map(|s| col(s))
.collect::<Vec<_>>();

LogicalPlan::Projection {
columns: Rc::new(columns),
input: Rc::new(self.0.clone()),
}
.into()
}

/// Apply a filter
pub fn filter(&self, predicate: Expr) -> Self {
LogicalPlan::Filter {
Expand Down
44 changes: 1 addition & 43 deletions polars/src/lazy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,6 @@ pub(crate) use crate::{
};
use arrow::datatypes::SchemaRef;

#[derive(Debug)]
pub enum DataStructure {
Series(Series),
DataFrame(DataFrame),
}

impl From<Series> for DataStructure {
fn from(s: Series) -> Self {
DataStructure::Series(s)
}
}

impl From<DataFrame> for DataStructure {
fn from(df: DataFrame) -> Self {
DataStructure::DataFrame(df)
}
}

impl DataStructure {
pub fn series_ref(&self) -> Result<&Series> {
if let DataStructure::Series(series) = self {
Ok(series)
} else {
Err(PolarsError::DataTypeMisMatch)
}
}

pub fn df_ref(&self) -> Result<&DataFrame> {
if let DataStructure::DataFrame(df) = self {
Ok(&df)
} else {
Err(PolarsError::DataTypeMisMatch)
}
}

pub fn len(&self) -> usize {
match self {
DataStructure::Series(s) => s.len(),
DataStructure::DataFrame(df) => df.height(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -84,6 +41,7 @@ mod tests {

let logical_plan = LogicalPlanBuilder::dataframe(df)
.filter(col("sepal.length").lt(lit(5)))
.select(&["sepal.length", "variety"])
.build();

println!("{:?}", logical_plan);
Expand Down
44 changes: 33 additions & 11 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ impl CsvExec {
}

impl ExecutionPlan for CsvExec {
fn execute(&self) -> Result<DataStructure> {
fn execute(&self) -> Result<DataFrame> {
let file = std::fs::File::open(&self.path).unwrap();

let df = CsvReader::new(file)
.has_header(self.has_header)
.with_batch_size(10000)
.finish()?;
Ok(DataStructure::DataFrame(df))
Ok(df)
}
}

Expand All @@ -52,15 +52,12 @@ impl FilterExec {
}

impl ExecutionPlan for FilterExec {
fn execute(&self) -> Result<DataStructure> {
let ds = self.input.execute()?;
let s = self.predicate.evaluate(&ds)?;
fn execute(&self) -> Result<DataFrame> {
let df = self.input.execute()?;
let s = self.predicate.evaluate(&df)?;
let mask = s.bool()?;

match ds {
DataStructure::DataFrame(df) => Ok(df.filter(mask)?.into()),
DataStructure::Series(s) => Ok(s.filter(mask)?.into()),
}
Ok(df.filter(mask)?)
}
}

Expand All @@ -76,10 +73,35 @@ impl DataFrameExec {
}

impl ExecutionPlan for DataFrameExec {
fn execute(&self) -> Result<DataStructure> {
fn execute(&self) -> Result<DataFrame> {
let mut ref_df = self.df.borrow_mut();
let df = &mut *ref_df;
let out = mem::take(df);
Ok(out.into())
Ok(out)
}
}

#[derive(Debug)]
pub struct ProjectionExec {
input: Rc<dyn ExecutionPlan>,
columns: Vec<Rc<dyn PhysicalExpr>>,
}

impl ProjectionExec {
pub(crate) fn new(input: Rc<dyn ExecutionPlan>, columns: Vec<Rc<dyn PhysicalExpr>>) -> Self {
Self { input, columns }
}
}

impl ExecutionPlan for ProjectionExec {
fn execute(&self) -> Result<DataFrame> {
// projection is only on a DataFrame so we unpack df
let df = self.input.execute()?;
let selected_columns = self
.columns
.iter()
.map(|expr| expr.evaluate(&df))
.collect::<Result<Vec<Series>>>()?;
Ok(DataFrame::new_no_checks(selected_columns))
}
}
15 changes: 8 additions & 7 deletions polars/src/lazy/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ impl LiteralExpr {
}

impl PhysicalExpr for LiteralExpr {
fn evaluate(&self, ds: &DataStructure) -> Result<Series> {
fn evaluate(&self, df: &DataFrame) -> Result<Series> {
match &self.0 {
// todo! implement single value chunked_arrays? Or allow comparison and arithemtic with
// ca of a single value
ScalarValue::Int32(v) => Ok(Int32Chunked::full("literal", *v, ds.len()).into_series()),
ScalarValue::Int32(v) => {
Ok(Int32Chunked::full("literal", *v, df.height()).into_series())
}
sv => panic!(format!("ScalarValue {:?} is not implemented", sv)),
}
}
Expand All @@ -35,9 +37,9 @@ impl BinaryExpr {
}

impl PhysicalExpr for BinaryExpr {
fn evaluate(&self, ds: &DataStructure) -> Result<Series> {
let left = self.left.evaluate(ds)?;
let right = self.right.evaluate(ds)?;
fn evaluate(&self, df: &DataFrame) -> Result<Series> {
let left = self.left.evaluate(df)?;
let right = self.right.evaluate(df)?;
match self.op {
Operator::Lt => {
let a = apply_method_all_series!(left, lt_series, &right);
Expand All @@ -58,8 +60,7 @@ impl ColumnExpr {
}

impl PhysicalExpr for ColumnExpr {
fn evaluate(&self, ds: &DataStructure) -> Result<Series> {
let df = ds.df_ref()?;
fn evaluate(&self, df: &DataFrame) -> Result<Series> {
let column = df.column(&self.0)?;
Ok(column.clone())
}
Expand Down
9 changes: 7 additions & 2 deletions polars/src/lazy/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ use super::*;
use std::fmt::Debug;
use std::rc::Rc;

pub enum ExprVal {
Series(Series),
Column(Vec<String>),
}

pub trait PhysicalPlanner {
fn create_physical_plan(&self, logical_plan: &LogicalPlan) -> Result<Rc<dyn ExecutionPlan>>;
}
Expand All @@ -14,13 +19,13 @@ pub trait ExecutionPlan: Debug {
fn schema(&self) -> SchemaRef {
todo!()
}
fn execute(&self) -> Result<DataStructure>;
fn execute(&self) -> Result<DataFrame>;
}

/// Implement this for Column, lt, eq, etc
pub trait PhysicalExpr: Debug {
fn data_type(&self, _input_schema: &Schema) -> Result<ArrowDataType> {
unimplemented!()
}
fn evaluate(&self, ds: &DataStructure) -> Result<Series>;
fn evaluate(&self, df: &DataFrame) -> Result<Series>;
}
10 changes: 9 additions & 1 deletion polars/src/lazy/physical_plan/planner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
executors::{CsvExec, FilterExec},
executors::{CsvExec, FilterExec, ProjectionExec},
expressions::LiteralExpr,
*,
};
Expand Down Expand Up @@ -40,6 +40,14 @@ impl SimplePlanner {
*has_header,
*delimiter,
))),
LogicalPlan::Projection { columns, input } => {
let input = self.create_initial_physical_plan(input)?;
let columns = columns
.iter()
.map(|expr| self.create_physical_expr(expr))
.collect::<Result<Vec<_>>>()?;
Ok(Rc::new(ProjectionExec::new(input, columns)))
}
LogicalPlan::DataFrameScan { df } => Ok(Rc::new(DataFrameExec::new(df.clone()))),
}
}
Expand Down
4 changes: 2 additions & 2 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ pub mod error;
mod fmt;
pub mod frame;
pub mod lazy;
#[cfg(feature = "lazy")]
pub mod lazy;
pub mod prelude;
pub mod series;
pub mod testing;
#[cfg(feature = "lazy")]
pub mod lazy;

0 comments on commit c2c8da3

Please sign in to comment.