Skip to content

Commit

Permalink
fix[rust]: fix cache on duplicate schemas (#4479)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 18, 2022
1 parent a50971a commit 08c7ceb
Show file tree
Hide file tree
Showing 17 changed files with 62 additions and 74 deletions.
7 changes: 5 additions & 2 deletions polars/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ impl LogicalPlan {
}
Ok(())
}
Cache { input } => {
let current_node = format!("CACHE [{:?}]", (branch, id));
Cache {
input,
id: cache_id,
} => {
let current_node = format!("CACHE [{:?}]", (branch, id, cache_id));
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/dsl/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Expr {
let expr = expr.clone();
let mut arena = Arena::with_capacity(10);
let aexpr = to_aexpr(expr, &mut arena);
let planner = DefaultPlanner::default();
let planner = PhysicalPlanner::default();
let phys_expr = planner.create_physical_expr(aexpr, Context::Default, &mut arena)?;

let state = ExecutionState::new();
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/dsl/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl ListNameSpace {
let lp = lp_arena.get(optimized);
let aexpr = lp.get_exprs().pop().unwrap();

let planner = DefaultPlanner::default();
let planner = PhysicalPlanner::default();
let phys_expr =
planner.create_physical_expr(aexpr, Context::Default, &mut expr_arena)?;

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ impl LazyFrame {
None
};

let planner = DefaultPlanner::default();
let planner = PhysicalPlanner::default();
let mut physical_plan =
planner.create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;

Expand Down
8 changes: 6 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub enum ALogicalPlan {
},
Cache {
input: Node,
id: usize,
},
Aggregate {
input: Node,
Expand Down Expand Up @@ -185,7 +186,7 @@ impl ALogicalPlan {
#[cfg(feature = "python")]
PythonScan { options } => &options.schema,
Union { inputs, .. } => return arena.get(inputs[0]).schema(arena),
Cache { input } => return arena.get(*input).schema(arena),
Cache { input, .. } => return arena.get(*input).schema(arena),
Sort { input, .. } => return arena.get(*input).schema(arena),
Explode { schema, .. } => schema,
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -319,7 +320,10 @@ impl ALogicalPlan {
columns: columns.clone(),
schema: schema.clone(),
},
Cache { .. } => Cache { input: inputs[0] },
Cache { id, .. } => Cache {
input: inputs[0],
id: *id,
},
Distinct { options, .. } => Distinct {
input: inputs[0],
options: options.clone(),
Expand Down
7 changes: 3 additions & 4 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,9 @@ impl LogicalPlanBuilder {
}

pub fn cache(self) -> Self {
LogicalPlan::Cache {
input: Box::new(self.0),
}
.into()
let input = Box::new(self.0);
let id = input.as_ref() as *const LogicalPlan as usize;
LogicalPlan::Cache { input, id }.into()
}

pub fn project(self, exprs: Vec<Expr>) -> Self {
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,9 @@ pub(crate) fn to_alp(
schema,
}
}
LogicalPlan::Cache { input } => {
LogicalPlan::Cache { input, id } => {
let input = to_alp(*input, expr_arena, lp_arena)?;
ALogicalPlan::Cache { input }
ALogicalPlan::Cache { input, id }
}
LogicalPlan::Aggregate {
input,
Expand Down Expand Up @@ -832,9 +832,9 @@ pub(crate) fn node_to_lp(
schema,
}
}
ALogicalPlan::Cache { input } => {
ALogicalPlan::Cache { input, id } => {
let input = Box::new(node_to_lp(input, expr_arena, lp_arena));
LogicalPlan::Cache { input }
LogicalPlan::Cache { input, id }
}
ALogicalPlan::Aggregate {
input,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl fmt::Debug for LogicalPlan {
)
}
Union { inputs, .. } => write!(f, "UNION {:?}", inputs),
Cache { input } => write!(f, "CACHE {:?}", input),
Cache { input, .. } => write!(f, "CACHE {:?}", input),
#[cfg(feature = "parquet")]
ParquetScan {
path,
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum LogicalPlan {
predicate: Expr,
},
/// Cache the input at this point in the LP
Cache { input: Box<LogicalPlan> },
Cache { input: Box<LogicalPlan>, id: usize },
/// Scan a CSV file
#[cfg(feature = "csv-file")]
CsvScan {
Expand Down Expand Up @@ -226,7 +226,7 @@ impl LogicalPlan {
#[cfg(feature = "python")]
PythonScan { options } => Ok(Cow::Borrowed(&options.schema)),
Union { inputs, .. } => inputs[0].schema(),
Cache { input } => input.schema(),
Cache { input, .. } => input.schema(),
Sort { input, .. } => input.schema(),
Explode { schema, .. } => Ok(Cow::Borrowed(schema)),
#[cfg(feature = "parquet")]
Expand Down
13 changes: 5 additions & 8 deletions polars/polars-lazy/src/physical_plan/executors/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@ use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

pub struct CacheExec {
pub key: String,
pub id: usize,
pub input: Box<dyn Executor>,
}

impl Executor for CacheExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if let Some(df) = state.cache_hit(&self.id) {
if state.verbose() {
println!("run Cache")
println!("CACHE HIT: cache id: {}", self.id);
}
}
if let Some(df) = state.cache_hit(&self.key) {
return Ok(df);
}

// cache miss
let df = self.input.execute(state)?;
state.store_cache(std::mem::take(&mut self.key), df.clone());
state.store_cache(self.id, df.clone());
if state.verbose() {
println!("cache set {:?}", self.key);
println!("CACHE SET: cache id: {}", self.id);
}
Ok(df)
}
Expand Down
13 changes: 0 additions & 13 deletions polars/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ use polars_io::predicates::PhysicalIoExpr;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

/// A type that implements this transforms a LogicalPlan to a physical plan.
///
/// We could produce different physical plans with different goals in mind, e.g. memory optimized
/// performance optimized, out of core, etc.
pub trait PhysicalPlanner {
fn create_physical_plan(
&self,
root: Node,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<Box<dyn Executor>>;
}

// Executor are the executors of the physical plan and produce DataFrames. They
// combine physical expressions, which produce Series.

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/planner/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_core::utils::parallel_op_series;
use super::super::expressions as phys_expr;
use crate::prelude::*;

impl DefaultPlanner {
impl PhysicalPlanner {
pub fn create_physical_expr(
&self,
expression: Node,
Expand Down
20 changes: 4 additions & 16 deletions polars/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ fn aggregate_expr_to_scan_agg(
}

#[derive(Default)]
pub struct DefaultPlanner {}
pub struct PhysicalPlanner {}

impl DefaultPlanner {
impl PhysicalPlanner {
pub fn create_physical_expressions(
&self,
exprs: &[Node],
Expand Down Expand Up @@ -260,21 +260,9 @@ impl DefaultPlanner {
let input = self.create_physical_plan(input, lp_arena, expr_arena)?;
Ok(Box::new(executors::ExplodeExec { input, columns }))
}
Cache { input } => {
let schema = lp_arena.get(input).schema(lp_arena);
// todo! fix the unique constraint in the schema. Probably in projection pushdown at joins
let mut unique = PlHashSet::with_capacity(schema.len());
// assumption of 80 characters per column name
let mut key = String::with_capacity(schema.len() * 80);
for name in schema.iter_names() {
if unique.insert(name) {
key.push_str(name)
}
}
// mutable borrow otherwise
drop(unique);
Cache { input, id } => {
let input = self.create_physical_plan(input, lp_arena, expr_arena)?;
Ok(Box::new(executors::CacheExec { key, input }))
Ok(Box::new(executors::CacheExec { id, input }))
}
Distinct { input, options } => {
let input = self.create_physical_plan(input, lp_arena, expr_arena)?;
Expand Down
14 changes: 0 additions & 14 deletions polars/polars-lazy/src/physical_plan/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,3 @@ mod lp;

pub use expr::*;
pub use lp::*;
use polars_core::prelude::*;

use crate::prelude::*;

impl PhysicalPlanner for DefaultPlanner {
fn create_physical_plan(
&self,
_root: Node,
_lp_arena: &mut Arena<ALogicalPlan>,
_expr_arena: &mut Arena<AExpr>,
) -> Result<Box<dyn Executor>> {
self.create_physical_plan(_root, _lp_arena, _expr_arena)
}
}
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl StateFlags {
/// State/ cache that is maintained during the Execution of the physical plan.
pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<String, DataFrame>>>,
df_cache: Arc<Mutex<PlHashMap<usize, DataFrame>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
pub(super) file_cache: FileCache,
Expand Down Expand Up @@ -131,13 +131,13 @@ impl ExecutionState {
}

/// Check if we have DataFrame in cache
pub(crate) fn cache_hit(&self, key: &str) -> Option<DataFrame> {
pub(crate) fn cache_hit(&self, key: &usize) -> Option<DataFrame> {
let guard = self.df_cache.lock();
guard.get(key).cloned()
}

/// Store DataFrame in cache.
pub(crate) fn store_cache(&self, key: String, df: DataFrame) {
pub(crate) fn store_cache(&self, key: usize, df: DataFrame) {
let mut guard = self.df_cache.lock();
guard.insert(key, df);
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub use crate::{
options::*,
*,
},
physical_plan::{expressions::*, planner::DefaultPlanner, Executor, PhysicalPlanner},
physical_plan::{expressions::*, planner::PhysicalPlanner, Executor},
};
pub(crate) use crate::{
logical_plan::{aexpr::*, alp::*, conversion::*, iterator::*},
Expand Down
24 changes: 24 additions & 0 deletions py-polars/tests/test_lazy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
from typing import Any, cast

import numpy as np
Expand Down Expand Up @@ -1426,3 +1427,26 @@ def test_all_any_accept_expr() -> None:
"null_in_row": [False, True, True],
"all_null_in_row": [False, False, False],
}


def test_lazy_cache_same_key() -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": ["x", "y", "z"]}).lazy()

# these have the same schema, but should not be used by cache as they are different
add_node = df.select([(pl.col("a") + pl.col("b")).alias("a"), pl.col("c")]).cache()
mult_node = df.select([(pl.col("a") * pl.col("b")).alias("a"), pl.col("c")]).cache()

assert mult_node.join(add_node, on="c", suffix="_mult").select(
[(pl.col("a") - pl.col("a_mult")).alias("a"), pl.col("c")]
).collect().to_dict(False) == {"a": [-1, 2, 7], "c": ["x", "y", "z"]}


def test_lazy_cache_hit(capfd: Any) -> None:
os.environ["POLARS_VERBOSE"] = "1"
df = pl.DataFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": ["x", "y", "z"]}).lazy()
add_node = df.select([(pl.col("a") + pl.col("b")).alias("a"), pl.col("c")]).cache()
assert add_node.join(add_node, on="c", suffix="_mult").select(
[(pl.col("a") - pl.col("a_mult")).alias("a"), pl.col("c")]
).collect().to_dict(False) == {"a": [0, 0, 0], "c": ["x", "y", "z"]}
(out, _) = capfd.readouterr()
assert "CACHE HIT" in out

0 comments on commit 08c7ceb

Please sign in to comment.