Skip to content

Commit

Permalink
update physical execution state
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 24, 2021
1 parent 7ea1ec0 commit 9473558
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 79 deletions.
10 changes: 4 additions & 6 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Lazy variant of a [DataFrame](polars_core::frame::DataFrame).
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

use ahash::RandomState;
use itertools::Itertools;
Expand All @@ -17,6 +16,7 @@ use crate::logical_plan::optimizer::stack_opt::{OptimizationRule, StackOptimizer
use crate::logical_plan::optimizer::{
predicate_pushdown::PredicatePushDown, projection_pushdown::ProjectionPushDown,
};
use crate::physical_plan::state::ExecutionState;
use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::join_pruning::JoinPrune;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
Expand Down Expand Up @@ -538,11 +538,9 @@ impl LazyFrame {
let planner = DefaultPlanner::default();
let mut physical_plan =
planner.create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;
let cache = Arc::new(Mutex::new(HashMap::with_capacity_and_hasher(
64,
RandomState::default(),
)));
let out = physical_plan.execute(&cache);

let state = ExecutionState::new();
let out = physical_plan.execute(&state);
if use_string_cache {
toggle_string_cache(!use_string_cache);
}
Expand Down
105 changes: 43 additions & 62 deletions polars/polars-lazy/src/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,14 @@ pub struct CacheExec {
}

impl Executor for CacheExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let guard = cache.lock().unwrap();

// cache hit
if let Some(df) = guard.get(&self.key) {
return Ok(df.clone());
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
if let Some(df) = state.cache_hit(&self.key) {
return Ok(df);
}
drop(guard);

// cache miss
let df = self.input.execute(cache)?;

let mut guard = cache.lock().unwrap();
let key = std::mem::take(&mut self.key);
guard.insert(key, df.clone());

let df = self.input.execute(state)?;
state.store_cache(std::mem::take(&mut self.key), df.clone());
if std::env::var(POLARS_VERBOSE).is_ok() {
println!("cache set {:?}", self.key);
}
Expand Down Expand Up @@ -113,20 +105,14 @@ impl ParquetExec {

#[cfg(feature = "parquet")]
impl Executor for ParquetExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let cache_key = match &self.predicate {
Some(predicate) => format!("{}{:?}", self.path, predicate.as_expression()),
None => self.path.to_string(),
};
if self.cache {
let guard = cache.lock().unwrap();
// cache hit
if let Some(df) = guard.get(&cache_key) {
return Ok(df.clone());
}
drop(guard);
if let Some(df) = state.cache_hit(&cache_key) {
return Ok(df);
}

// cache miss
let file = std::fs::File::open(&self.path).unwrap();

Expand Down Expand Up @@ -160,8 +146,7 @@ impl Executor for ParquetExec {
)?;

if self.cache {
let mut guard = cache.lock().unwrap();
guard.insert(cache_key, df.clone());
state.store_cache(cache_key, df.clone())
}
if std::env::var(POLARS_VERBOSE).is_ok() {
println!("parquet {:?} read", self.path);
Expand Down Expand Up @@ -217,18 +202,15 @@ impl CsvExec {
}

impl Executor for CsvExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let cache_key = match &self.predicate {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let state_key = match &self.predicate {
Some(predicate) => format!("{}{:?}", self.path, predicate.as_expression()),
None => self.path.to_string(),
};
if self.cache {
let guard = cache.lock().unwrap();
// cache hit
if let Some(df) = guard.get(&cache_key) {
return Ok(df.clone());
if let Some(df) = state.cache_hit(&state_key) {
return Ok(df);
}
drop(guard);
}

// cache miss
Expand Down Expand Up @@ -265,8 +247,7 @@ impl Executor for CsvExec {
let df = reader.finish_with_scan_ops(self.predicate.clone(), aggregate)?;

if self.cache {
let mut guard = cache.lock().unwrap();
guard.insert(cache_key, df.clone());
state.store_cache(state_key, df.clone());
}
if std::env::var(POLARS_VERBOSE).is_ok() {
println!("csv {:?} read", self.path);
Expand All @@ -288,8 +269,8 @@ impl FilterExec {
}

impl Executor for FilterExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let s = self.predicate.evaluate(&df)?;
let mask = s.bool().expect("filter predicate wasn't of type boolean");
let df = df.filter(mask)?;
Expand Down Expand Up @@ -321,7 +302,7 @@ impl DataFrameExec {
}

impl Executor for DataFrameExec {
fn execute(&mut self, _: &Cache) -> Result<DataFrame> {
fn execute(&mut self, _: &ExecutionState) -> Result<DataFrame> {
let df = mem::take(&mut self.df);
let mut df = Arc::try_unwrap(df).unwrap_or_else(|df| (*df).clone());

Expand Down Expand Up @@ -400,8 +381,8 @@ pub(crate) fn evaluate_physical_expressions(
}

impl Executor for StandardExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;

let df = evaluate_physical_expressions(&df, &self.expr);
if std::env::var(POLARS_VERBOSE).is_ok() {
Expand All @@ -417,8 +398,8 @@ pub(crate) struct ExplodeExec {
}

impl Executor for ExplodeExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
df.explode(&self.columns)
}
}
Expand All @@ -430,8 +411,8 @@ pub(crate) struct SortExec {
}

impl Executor for SortExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
df.sort(&self.by_column, self.reverse)
}
}
Expand All @@ -443,8 +424,8 @@ pub(crate) struct DropDuplicatesExec {
}

impl Executor for DropDuplicatesExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
df.drop_duplicates(
self.maintain_order,
self.subset.as_ref().map(|v| v.as_ref()),
Expand Down Expand Up @@ -518,8 +499,8 @@ fn groupby_helper(
}

impl Executor for GroupByExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let keys = self
.keys
.iter()
Expand Down Expand Up @@ -554,8 +535,8 @@ impl PartitionGroupByExec {
}

impl Executor for PartitionGroupByExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let original_df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let original_df = self.input.execute(state)?;

// already get the keys. This is the very last minute decision which groupby method we choose.
// If the column is a categorical, we know the number of groups we have and can decide to continue
Expand Down Expand Up @@ -717,28 +698,28 @@ impl JoinExec {
}

impl Executor for JoinExec {
fn execute<'a>(&'a mut self, cache: &'a Cache) -> Result<DataFrame> {
fn execute<'a>(&'a mut self, state: &'a ExecutionState) -> Result<DataFrame> {
let mut input_left = self.input_left.take().unwrap();
let mut input_right = self.input_right.take().unwrap();

let (df_left, df_right) = if self.parallel {
let cache_left = cache.clone();
let cache_right = cache.clone();
let state_left = state.clone();
let state_right = state.clone();
// propagate the fetch_rows static value to the spawning threads.
let fetch_rows = FETCH_ROWS.with(|fetch_rows| fetch_rows.get());

POOL.join(
move || {
FETCH_ROWS.with(|fr| fr.set(fetch_rows));
input_left.execute(&cache_left)
input_left.execute(&state_left)
},
move || {
FETCH_ROWS.with(|fr| fr.set(fetch_rows));
input_right.execute(&cache_right)
input_right.execute(&state_right)
},
)
} else {
(input_left.execute(&cache), input_right.execute(&cache))
(input_left.execute(&state), input_right.execute(&state))
};

let df_left = df_left?;
Expand Down Expand Up @@ -775,8 +756,8 @@ impl StackExec {
}

impl Executor for StackExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let mut df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;
let height = df.height();

let res: Result<_> = self.expr.iter().try_for_each(|expr| {
Expand Down Expand Up @@ -808,8 +789,8 @@ pub struct SliceExec {
}

impl Executor for SliceExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
Ok(df.slice(self.offset, self.len))
}
}
Expand All @@ -820,8 +801,8 @@ pub struct MeltExec {
}

impl Executor for MeltExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
df.melt(&self.id_vars.as_slice(), &self.value_vars.as_slice())
}
}
Expand All @@ -832,8 +813,8 @@ pub(crate) struct UdfExec {
}

impl Executor for UdfExec {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame> {
let df = self.input.execute(cache)?;
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
self.function.call_udf(df)
}
}
21 changes: 10 additions & 11 deletions polars/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
pub mod executors;
pub mod expressions;
pub mod planner;
pub(crate) mod state;

use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use ahash::RandomState;
use polars_core::prelude::*;
use polars_io::PhysicalIoExpr;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

pub enum ExprVal {
Series(Series),
Column(Vec<String>),
}

/// A type that implements this transforms a LogicalPlan to a physical plan.
///
/// We could produce different physical plans with different goals in mind, e.g. memory optimized
/// performance optimized, out of core, etc.
pub trait PhysicalPlanner {
fn create_physical_plan(
&self,
Expand All @@ -27,8 +25,9 @@ pub trait PhysicalPlanner {
// combine physical expressions, which produce Series.

/// Executors will evaluate physical expressions and collect them in a DataFrame.
///
/// Executors have other executors as input. By having a tree of executors we can execute the
/// physical plan until the last executor is evaluated.
pub trait Executor: Send + Sync {
fn execute(&mut self, cache: &Cache) -> Result<DataFrame>;
fn execute(&mut self, cache: &ExecutionState) -> Result<DataFrame>;
}

pub(crate) type Cache = Arc<Mutex<HashMap<String, DataFrame, RandomState>>>;
39 changes: 39 additions & 0 deletions polars/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use ahash::RandomState;
use polars_core::frame::groupby::GroupTuples;
use polars_core::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

/// State/ cache that is maintained during the Execution of the physical plan.
#[derive(Clone)]
pub struct ExecutionState {
df_cache: Arc<Mutex<HashMap<String, DataFrame, RandomState>>>,
gb_cache: Arc<Mutex<HashMap<String, GroupTuples, RandomState>>>,
}

impl ExecutionState {
pub fn new() -> Self {
Self {
df_cache: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))),
gb_cache: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))),
}
}

/// Check if we have DataFrame in cache
pub fn cache_hit(&self, key: &str) -> Option<DataFrame> {
let guard = self.df_cache.lock().unwrap();
guard.get(key).cloned()
}

/// Store DataFrame in cache.
pub fn store_cache(&self, key: String, df: DataFrame) {
let mut guard = self.df_cache.lock().unwrap();
guard.insert(key, df);
}
}

impl Default for ExecutionState {
fn default() -> Self {
ExecutionState::new()
}
}

0 comments on commit 9473558

Please sign in to comment.