Skip to content

Commit

Permalink
lazy scanDataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent 3c07015 commit aa8a51a
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 32 deletions.
113 changes: 103 additions & 10 deletions polars/src/chunked_array/comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,28 +510,121 @@ impl Not for BooleanChunked {
}

pub trait CompToSeries {
fn lt_series(&self, rhs: &Series) -> BooleanChunked {
fn lt_series(&self, _rhs: &Series) -> BooleanChunked {
unimplemented!()
}

fn gt_series(&self, _rhs: &Series) -> BooleanChunked {
unimplemented!()
}

fn gt_eq_series(&self, _rhs: &Series) -> BooleanChunked {
unimplemented!()
}

fn lt_eq_series(&self, _rhs: &Series) -> BooleanChunked {
unimplemented!()
}

fn eq_series(&self, _rhs: &Series) -> BooleanChunked {
unimplemented!()
}

fn neq_series(&self, _rhs: &Series) -> BooleanChunked {
unimplemented!()
}
}

macro_rules! impl_comp_to_series {
($SELF:ident, $METHOD_NAME:ident, $OPERATION:ident, $RHS:ident, $NAME:expr, $TYPE:ty) => {{
match $SELF.unpack_series_matching_type($RHS) {
Ok(ca) => $SELF.lt(ca),
Err(_) => match $RHS.cast::<$TYPE>() {
Ok(s) => $SELF.$METHOD_NAME(&s),
Err(_) => BooleanChunked::full($NAME, false, $SELF.len()),
},
}
}};
}

impl<T> CompToSeries for ChunkedArray<T>
where
T: PolarsNumericType,
{
fn lt_series(&self, rhs: &Series) -> BooleanChunked {
match self.unpack_series_matching_type(rhs) {
Ok(ca) => self.lt(ca),
Err(_) => match rhs.cast::<T>() {
Ok(s) => self.lt_series(&s),
Err(e) => BooleanChunked::full("lt", false, self.len()),
},
}
impl_comp_to_series!(self, lt_series, lt, rhs, "lt", T)
}

fn gt_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, gt_series, gt, rhs, "gt", T)
}

fn gt_eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, gt_eq_series, gt_eq, rhs, "gt_eq", T)
}

fn lt_eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, lt_eq_series, lt_eq, rhs, "lt_eq", T)
}

fn eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, eq_series, eq, rhs, "eq", T)
}

fn neq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, neq_series, neq, rhs, "neq", T)
}
}

impl CompToSeries for BooleanChunked {}
impl CompToSeries for Utf8Chunked {}
impl CompToSeries for BooleanChunked {
fn lt_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, lt_series, lt, rhs, "lt", BooleanType)
}

fn gt_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, gt_series, gt, rhs, "gt", BooleanType)
}

fn gt_eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, gt_eq_series, gt_eq, rhs, "gt_eq", BooleanType)
}

fn lt_eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, lt_eq_series, lt_eq, rhs, "lt_eq", BooleanType)
}

fn eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, eq_series, eq, rhs, "eq", BooleanType)
}

fn neq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, neq_series, neq, rhs, "neq", BooleanType)
}
}
impl CompToSeries for Utf8Chunked {
fn lt_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, lt_series, lt, rhs, "lt", Utf8Type)
}

fn gt_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, gt_series, gt, rhs, "gt", Utf8Type)
}
fn gt_eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, gt_eq_series, gt_eq, rhs, "gt_eq", Utf8Type)
}

fn lt_eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, lt_eq_series, lt_eq, rhs, "lt_eq", Utf8Type)
}

fn eq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, eq_series, eq, rhs, "eq", Utf8Type)
}

fn neq_series(&self, rhs: &Series) -> BooleanChunked {
impl_comp_to_series!(self, neq_series, neq, rhs, "neq", Utf8Type)
}
}
impl CompToSeries for LargeListChunked {}

#[cfg(test)]
Expand Down
13 changes: 13 additions & 0 deletions polars/src/chunked_array/upstream_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ use rayon::iter::{FromParallelIterator, IntoParallelIterator};
use rayon::prelude::*;
use std::collections::LinkedList;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::sync::Arc;

impl<T> Default for ChunkedArray<T> {
fn default() -> Self {
ChunkedArray {
field: Arc::new(Field::new("default", ArrowDataType::Null, false)),
chunks: Vec::with_capacity(0),
chunk_id: Vec::with_capacity(0),
phantom: PhantomData,
}
}
}

/// FromIterator trait

Expand Down
6 changes: 6 additions & 0 deletions polars/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ impl<T: PolarsDataType> IntoSeries for ChunkedArray<T> {
}
}

impl Default for DataFrame {
fn default() -> Self {
DataFrame::new_no_checks(Vec::with_capacity(0))
}
}

type DfSchema = Arc<Schema>;
type DfSeries = Series;
type DfColumns = Vec<DfSeries>;
Expand Down
11 changes: 11 additions & 0 deletions polars/src/lazy/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::prelude::*;
use arrow::datatypes::SchemaRef;
use std::cell::RefCell;
use std::rc::Rc;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -135,6 +136,9 @@ pub enum LogicalPlan {
has_header: bool,
delimiter: Option<u8>,
},
DataFrameScan {
df: Rc<RefCell<DataFrame>>,
},
}

pub struct LogicalPlanBuilder(LogicalPlan);
Expand Down Expand Up @@ -173,6 +177,13 @@ impl LogicalPlanBuilder {
pub fn build(self) -> LogicalPlan {
self.0
}

pub fn dataframe(df: DataFrame) -> Self {
LogicalPlan::DataFrameScan {
df: Rc::new(RefCell::new(df)),
}
.into()
}
}

/// Create a column expression based on a column name.
Expand Down
36 changes: 27 additions & 9 deletions polars/src/lazy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
mod logical_plan;
mod physical_plan;

use crate::{
lazy::{
logical_plan::*,
physical_plan::{expressions::*, planner::SimplePlanner, PhysicalExpr, PhysicalPlanner},
},
pub(crate) use crate::{
lazy::{logical_plan::*, physical_plan::expressions::*},
prelude::*,
};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -58,15 +55,36 @@ impl DataStructure {
#[cfg(test)]
mod tests {
use super::*;
use crate::lazy::physical_plan::{planner::SimplePlanner, PhysicalPlanner};
use std::io::Cursor;

// physical plan see: datafusion/physical_plan/planner.rs.html#61-63

#[test]
fn plan_builder_simple() {
let logical_plan =
LogicalPlanBuilder::scan_csv("../data/iris.csv".into(), None, true, None)
.filter(col("sepal.length").lt(lit(5)))
.build();
let s = r#"
"sepal.length","sepal.width","petal.length","petal.width","variety"
5.1,3.5,1.4,.2,"Setosa"
4.9,3,1.4,.2,"Setosa"
4.7,3.2,1.3,.2,"Setosa"
4.6,3.1,1.5,.2,"Setosa"
5,3.6,1.4,.2,"Setosa"
5.4,3.9,1.7,.4,"Setosa"
4.6,3.4,1.4,.3,"Setosa"
"#;

let file = Cursor::new(s);

let df = CsvReader::new(file)
// we also check if infer schema ignores errors
.infer_schema(Some(3))
.has_header(true)
.finish()
.unwrap();

let logical_plan = LogicalPlanBuilder::dataframe(df)
.filter(col("sepal.length").lt(lit(5)))
.build();

println!("{:?}", logical_plan);

Expand Down
22 changes: 22 additions & 0 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;
use arrow::datatypes::SchemaRef;
use std::cell::RefCell;
use std::mem;

#[derive(Debug)]
pub struct CsvExec {
Expand Down Expand Up @@ -61,3 +63,23 @@ impl ExecutionPlan for FilterExec {
}
}
}

#[derive(Debug)]
pub struct DataFrameExec {
df: Rc<RefCell<DataFrame>>,
}

impl DataFrameExec {
pub(crate) fn new(df: Rc<RefCell<DataFrame>>) -> Self {
DataFrameExec { df }
}
}

impl ExecutionPlan for DataFrameExec {
fn execute(&self) -> Result<DataStructure> {
let mut ref_df = self.df.borrow_mut();
let df = &mut *ref_df;
let out = mem::take(df);
Ok(out.into())
}
}
12 changes: 0 additions & 12 deletions polars/src/lazy/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ impl LiteralExpr {
}

impl PhysicalExpr for LiteralExpr {
fn data_type(&self, input_schema: &Schema) -> Result<ArrowDataType> {
unimplemented!()
}

fn evaluate(&self, ds: &DataStructure) -> Result<Series> {
match &self.0 {
// todo! implement single value chunked_arrays? Or allow comparison and arithemtic with
Expand All @@ -39,10 +35,6 @@ impl BinaryExpr {
}

impl PhysicalExpr for BinaryExpr {
fn data_type(&self, input_schema: &Schema) -> Result<ArrowDataType> {
unimplemented!()
}

fn evaluate(&self, ds: &DataStructure) -> Result<Series> {
let left = self.left.evaluate(ds)?;
let right = self.right.evaluate(ds)?;
Expand All @@ -66,10 +58,6 @@ impl ColumnExpr {
}

impl PhysicalExpr for ColumnExpr {
fn data_type(&self, input_schema: &Schema) -> Result<ArrowDataType> {
unimplemented!()
}

fn evaluate(&self, ds: &DataStructure) -> Result<Series> {
let df = ds.df_ref()?;
let column = df.column(&self.0)?;
Expand Down
4 changes: 3 additions & 1 deletion polars/src/lazy/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub trait ExecutionPlan: Debug {

/// Implement this for Column, lt, eq, etc
pub trait PhysicalExpr: Debug {
fn data_type(&self, input_schema: &Schema) -> Result<ArrowDataType>;
fn data_type(&self, _input_schema: &Schema) -> Result<ArrowDataType> {
unimplemented!()
}
fn evaluate(&self, ds: &DataStructure) -> Result<Series>;
}
2 changes: 2 additions & 0 deletions polars/src/lazy/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::{
expressions::LiteralExpr,
*,
};
use crate::lazy::physical_plan::executors::DataFrameExec;

pub(crate) struct SimplePlanner {}
impl Default for SimplePlanner {
Expand Down Expand Up @@ -39,6 +40,7 @@ impl SimplePlanner {
*has_header,
*delimiter,
))),
LogicalPlan::DataFrameScan { df } => Ok(Rc::new(DataFrameExec::new(df.clone()))),
}
}

Expand Down
2 changes: 2 additions & 0 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,5 @@ pub mod lazy;
pub mod prelude;
pub mod series;
pub mod testing;
#[cfg(feature = "lazy")]
pub mod lazy;
6 changes: 6 additions & 0 deletions polars/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,12 @@ impl From<(&str, ArrayRef)> for Series {
}
}

impl Default for Series {
fn default() -> Self {
Series::Int8(ChunkedArray::default())
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;
Expand Down

0 comments on commit aa8a51a

Please sign in to comment.