Skip to content

Commit

Permalink
make lazy api sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 7, 2020
1 parent 261f79d commit 1f44e9f
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 119 deletions.
10 changes: 5 additions & 5 deletions polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use arrow::datatypes::{Field, Schema};
use std::{
fmt,
ops::{Add, Div, Mul, Rem, Sub},
rc::Rc,
sync::Arc,
};

/// Queries consists of multiple expressions.
#[derive(Clone)]
pub enum Expr {
Alias(Box<Expr>, Rc<String>),
Column(Rc<String>),
Alias(Box<Expr>, Arc<String>),
Column(Arc<String>),
Literal(ScalarValue),
BinaryExpr {
left: Box<Expr>,
Expand Down Expand Up @@ -256,7 +256,7 @@ impl Expr {

/// Rename Column.
pub fn alias(self, name: &str) -> Expr {
Expr::Alias(Box::new(self), Rc::new(name.into()))
Expr::Alias(Box::new(self), Arc::new(name.into()))
}

/// Run is_null operation on `Expr`.
Expand Down Expand Up @@ -341,7 +341,7 @@ impl Expr {

/// Create a Colum Expression based on a column name.
pub fn col(name: &str) -> Expr {
Expr::Column(Rc::new(name.to_owned()))
Expr::Column(Arc::new(name.to_owned()))
}

pub trait Literal {
Expand Down
16 changes: 8 additions & 8 deletions polars/src/lazy/frame.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Lazy variant of a [DataFrame](crate::prelude::DataFrame).
use crate::frame::select::Selection;
use crate::{lazy::prelude::*, prelude::*};
use std::rc::Rc;
use std::sync::Arc;

impl DataFrame {
/// Convert the `DataFrame` into a lazy `DataFrame`
Expand Down Expand Up @@ -259,8 +259,8 @@ impl LazyFrame {
.join(
other.logical_plan,
JoinType::Left,
Rc::new(left_on.into()),
Rc::new(right_on.into()),
Arc::new(left_on.into()),
Arc::new(right_on.into()),
)
.build();
Self::from_logical_plan(lp, opt_state)
Expand All @@ -274,8 +274,8 @@ impl LazyFrame {
.join(
other.logical_plan,
JoinType::Outer,
Rc::new(left_on.into()),
Rc::new(right_on.into()),
Arc::new(left_on.into()),
Arc::new(right_on.into()),
)
.build();
Self::from_logical_plan(lp, opt_state)
Expand All @@ -289,8 +289,8 @@ impl LazyFrame {
.join(
other.logical_plan,
JoinType::Inner,
Rc::new(left_on.into()),
Rc::new(right_on.into()),
Arc::new(left_on.into()),
Arc::new(right_on.into()),
)
.build();
Self::from_logical_plan(lp, opt_state)
Expand Down Expand Up @@ -339,7 +339,7 @@ impl LazyGroupBy {
/// ```
pub fn agg(self, aggs: Vec<Expr>) -> LazyFrame {
let lp = LogicalPlanBuilder::from(self.logical_plan)
.groupby(Rc::new(self.keys), aggs)
.groupby(Arc::new(self.keys), aggs)
.build();
LazyFrame::from_logical_plan(lp, self.opt_state)
}
Expand Down
20 changes: 10 additions & 10 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
};
use arrow::datatypes::DataType;
use fnv::FnvHashSet;
use std::cell::RefCell;
use std::{fmt, rc::Rc};
use std::sync::Mutex;
use std::{fmt, sync::Arc};

#[derive(Clone, Debug)]
pub enum ScalarValue {
Expand Down Expand Up @@ -93,7 +93,7 @@ pub enum LogicalPlan {
delimiter: Option<u8>,
},
DataFrameScan {
df: Rc<RefCell<DataFrame>>,
df: Arc<Mutex<DataFrame>>,
schema: Schema,
},
// vertical selection
Expand All @@ -109,7 +109,7 @@ pub enum LogicalPlan {
},
Aggregate {
input: Box<LogicalPlan>,
keys: Rc<Vec<String>>,
keys: Arc<Vec<String>>,
aggs: Vec<Expr>,
schema: Schema,
},
Expand All @@ -118,8 +118,8 @@ pub enum LogicalPlan {
input_right: Box<LogicalPlan>,
schema: Schema,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
left_on: Arc<String>,
right_on: Arc<String>,
},
}

Expand Down Expand Up @@ -223,7 +223,7 @@ impl LogicalPlanBuilder {
.into()
}

pub fn groupby(self, keys: Rc<Vec<String>>, aggs: Vec<Expr>) -> Self {
pub fn groupby(self, keys: Arc<Vec<String>>, aggs: Vec<Expr>) -> Self {
let current_schema = self.0.schema();

let fields = keys
Expand Down Expand Up @@ -252,7 +252,7 @@ impl LogicalPlanBuilder {
pub fn from_existing_df(df: DataFrame) -> Self {
let schema = df.schema();
LogicalPlan::DataFrameScan {
df: Rc::new(RefCell::new(df)),
df: Arc::new(Mutex::new(df)),
schema,
}
.into()
Expand All @@ -271,8 +271,8 @@ impl LogicalPlanBuilder {
self,
other: LogicalPlan,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
left_on: Arc<String>,
right_on: Arc<String>,
) -> Self {
let schema_left = self.0.schema();
let schema_right = other.schema();
Expand Down
11 changes: 6 additions & 5 deletions polars/src/lazy/logical_plan/optimizer/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use crate::lazy::prelude::*;
use crate::lazy::utils::{expr_to_root_column, rename_expr_root_name};
use crate::prelude::*;
use fnv::FnvHashMap;
use std::rc::Rc;
use std::sync::Arc;

pub struct PredicatePushDown {}

impl PredicatePushDown {
fn finish_at_leaf(
&self,
lp: LogicalPlan,
acc_predicates: FnvHashMap<Rc<String>, Expr>,
acc_predicates: FnvHashMap<Arc<String>, Expr>,
) -> Result<LogicalPlan> {
match acc_predicates.len() {
// No filter in the logical plan
Expand Down Expand Up @@ -46,7 +46,7 @@ impl PredicatePushDown {
fn push_down(
&self,
logical_plan: LogicalPlan,
mut acc_predicates: FnvHashMap<Rc<String>, Expr>,
mut acc_predicates: FnvHashMap<Arc<String>, Expr>,
) -> Result<LogicalPlan> {
use LogicalPlan::*;

Expand Down Expand Up @@ -132,11 +132,12 @@ impl PredicatePushDown {

for predicate in acc_predicates.values() {
if check_down_node(&predicate, schema_left) {
let name = Rc::new(predicate.to_field(schema_left).unwrap().name().clone());
let name =
Arc::new(predicate.to_field(schema_left).unwrap().name().clone());
pushdown_left.insert(name, predicate.clone());
} else if check_down_node(&predicate, schema_right) {
let name =
Rc::new(predicate.to_field(schema_right).unwrap().name().clone());
Arc::new(predicate.to_field(schema_right).unwrap().name().clone());
pushdown_right.insert(name, predicate.clone());
} else {
local_predicates.push(predicate.clone())
Expand Down
57 changes: 28 additions & 29 deletions polars/src/lazy/physical_plan/executors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use std::cell::RefCell;
use std::mem;
use std::sync::Mutex;

#[derive(Debug)]
pub struct CsvExec {
Expand Down Expand Up @@ -35,12 +35,12 @@ impl Executor for CsvExec {

#[derive(Debug)]
pub struct FilterExec {
predicate: Rc<dyn PhysicalExpr>,
input: Rc<dyn Executor>,
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn Executor>,
}

impl FilterExec {
pub fn new(predicate: Rc<dyn PhysicalExpr>, input: Rc<dyn Executor>) -> Self {
pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn Executor>) -> Self {
Self { predicate, input }
}
}
Expand All @@ -57,20 +57,19 @@ impl Executor for FilterExec {

#[derive(Debug)]
pub struct DataFrameExec {
df: Rc<RefCell<DataFrame>>,
df: Arc<Mutex<DataFrame>>,
}

impl DataFrameExec {
pub(crate) fn new(df: Rc<RefCell<DataFrame>>) -> Self {
pub(crate) fn new(df: Arc<Mutex<DataFrame>>) -> Self {
DataFrameExec { df }
}
}

impl Executor for DataFrameExec {
fn execute(&self) -> Result<DataFrame> {
let mut ref_df = self.df.borrow_mut();
let df = &mut *ref_df;
let out = mem::take(df);
let mut guard = self.df.lock().unwrap();
let out = mem::take(&mut *guard);
Ok(out)
}
}
Expand All @@ -80,15 +79,15 @@ impl Executor for DataFrameExec {
pub struct PipeExec {
/// i.e. sort, projection
operation: &'static str,
input: Rc<dyn Executor>,
expr: Vec<Rc<dyn PhysicalExpr>>,
input: Arc<dyn Executor>,
expr: Vec<Arc<dyn PhysicalExpr>>,
}

impl PipeExec {
pub(crate) fn new(
operation: &'static str,
input: Rc<dyn Executor>,
expr: Vec<Rc<dyn PhysicalExpr>>,
input: Arc<dyn Executor>,
expr: Vec<Arc<dyn PhysicalExpr>>,
) -> Self {
Self {
operation,
Expand Down Expand Up @@ -123,13 +122,13 @@ impl Executor for PipeExec {

#[derive(Debug)]
pub struct SortExec {
input: Rc<dyn Executor>,
input: Arc<dyn Executor>,
by_column: String,
reverse: bool,
}

impl SortExec {
pub(crate) fn new(input: Rc<dyn Executor>, by_column: String, reverse: bool) -> Self {
pub(crate) fn new(input: Arc<dyn Executor>, by_column: String, reverse: bool) -> Self {
Self {
input,
by_column,
Expand All @@ -148,16 +147,16 @@ impl Executor for SortExec {
/// Take an input Executor and a multiple expressions
#[derive(Debug)]
pub struct GroupByExec {
input: Rc<dyn Executor>,
keys: Rc<Vec<String>>,
aggs: Vec<Rc<dyn PhysicalExpr>>,
input: Arc<dyn Executor>,
keys: Arc<Vec<String>>,
aggs: Vec<Arc<dyn PhysicalExpr>>,
}

impl GroupByExec {
pub(crate) fn new(
input: Rc<dyn Executor>,
keys: Rc<Vec<String>>,
aggs: Vec<Rc<dyn PhysicalExpr>>,
input: Arc<dyn Executor>,
keys: Arc<Vec<String>>,
aggs: Vec<Arc<dyn PhysicalExpr>>,
) -> Self {
Self { input, keys, aggs }
}
Expand All @@ -184,20 +183,20 @@ impl Executor for GroupByExec {

#[derive(Debug)]
pub struct JoinExec {
input_left: Rc<dyn Executor>,
input_right: Rc<dyn Executor>,
input_left: Arc<dyn Executor>,
input_right: Arc<dyn Executor>,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
left_on: Arc<String>,
right_on: Arc<String>,
}

impl JoinExec {
pub(crate) fn new(
input_left: Rc<dyn Executor>,
input_right: Rc<dyn Executor>,
input_left: Arc<dyn Executor>,
input_right: Arc<dyn Executor>,
how: JoinType,
left_on: Rc<String>,
right_on: Rc<String>,
left_on: Arc<String>,
right_on: Arc<String>,
) -> Self {
JoinExec {
input_left,
Expand Down

0 comments on commit 1f44e9f

Please sign in to comment.