From f62c51b13fa7409e4f4e3747ea7fe9c89e7642cd Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 13 Apr 2024 14:38:50 +0200 Subject: [PATCH] fix: Recompute RowIndex schema after projection pd (#15625) --- crates/polars-lazy/src/frame/mod.rs | 8 +- .../src/physical_plan/executors/cache.rs | 4 +- .../physical_plan/executors/ext_context.rs | 2 +- .../src/physical_plan/executors/filter.rs | 2 +- .../src/physical_plan/executors/group_by.rs | 2 +- .../executors/group_by_dynamic.rs | 2 +- .../executors/group_by_partitioned.rs | 2 +- .../executors/group_by_rolling.rs | 2 +- .../src/physical_plan/executors/hconcat.rs | 6 +- .../src/physical_plan/executors/join.rs | 2 +- .../src/physical_plan/executors/projection.rs | 4 +- .../physical_plan/executors/python_scan.rs | 2 +- .../src/physical_plan/executors/slice.rs | 2 +- .../src/physical_plan/executors/sort.rs | 2 +- .../src/physical_plan/executors/stack.rs | 4 +- .../src/physical_plan/executors/udf.rs | 2 +- .../src/physical_plan/executors/union.rs | 8 +- .../src/physical_plan/executors/unique.rs | 2 +- .../polars-plan/src/logical_plan/builder.rs | 6 +- .../src/logical_plan/builder_alp.rs | 6 +- .../src/logical_plan/builder_functions.rs | 4 - .../src/logical_plan/functions/mod.rs | 86 ++------------ .../src/logical_plan/functions/schema.rs | 110 ++++++++++++++++++ .../projection_pushdown/functions/mod.rs | 4 + py-polars/tests/unit/test_cse.py | 19 ++- py-polars/tests/unit/test_lazy.py | 4 +- 26 files changed, 169 insertions(+), 128 deletions(-) create mode 100644 crates/polars-plan/src/logical_plan/functions/schema.rs diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index e42d37efec28..9b8dc7e21268 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -36,7 +36,6 @@ pub use polars_plan::frame::{AllowedOptimizations, OptState}; use polars_plan::global::FETCH_ROWS; use smartstring::alias::String as SmartString; -use crate::fallible; use crate::physical_plan::executors::Executor; use crate::physical_plan::planner::{create_physical_expr, create_physical_plan}; use crate::physical_plan::state::ExecutionState; @@ -1688,15 +1687,10 @@ impl LazyFrame { }; if add_row_index_in_map { - let schema = fallible!(self.schema(), &self); - let schema = schema - .new_inserting_at_index(0, name.into(), IDX_DTYPE) - .unwrap(); - self.map_private(FunctionNode::RowIndex { name: Arc::from(name), offset, - schema: Arc::new(schema), + schema: Default::default(), }) } else { self diff --git a/crates/polars-lazy/src/physical_plan/executors/cache.rs b/crates/polars-lazy/src/physical_plan/executors/cache.rs index b80359825fd4..95ca6897cf9c 100644 --- a/crates/polars-lazy/src/physical_plan/executors/cache.rs +++ b/crates/polars-lazy/src/physical_plan/executors/cache.rs @@ -27,9 +27,9 @@ impl Executor for CacheExec { if state.verbose() { if cache_hit { - println!("CACHE HIT: cache id: {:x}", self.id); + eprintln!("CACHE HIT: cache id: {:x}", self.id); } else { - println!("CACHE SET: cache id: {:x}", self.id); + eprintln!("CACHE SET: cache id: {:x}", self.id); } } diff --git a/crates/polars-lazy/src/physical_plan/executors/ext_context.rs b/crates/polars-lazy/src/physical_plan/executors/ext_context.rs index 05d09f82b9a0..89fee49cc10b 100644 --- a/crates/polars-lazy/src/physical_plan/executors/ext_context.rs +++ b/crates/polars-lazy/src/physical_plan/executors/ext_context.rs @@ -10,7 +10,7 @@ impl Executor for ExternalContext { #[cfg(debug_assertions)] { if state.verbose() { - println!("run ExternalContext") + eprintln!("run ExternalContext") } } // we evaluate contexts first as input may has pushed exprs. diff --git a/crates/polars-lazy/src/physical_plan/executors/filter.rs b/crates/polars-lazy/src/physical_plan/executors/filter.rs index 0274a71846f7..76a456c5882b 100644 --- a/crates/polars-lazy/src/physical_plan/executors/filter.rs +++ b/crates/polars-lazy/src/physical_plan/executors/filter.rs @@ -68,7 +68,7 @@ impl Executor for FilterExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run FilterExec") + eprintln!("run FilterExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/group_by.rs b/crates/polars-lazy/src/physical_plan/executors/group_by.rs index f16364c3e031..8542f9fbb338 100644 --- a/crates/polars-lazy/src/physical_plan/executors/group_by.rs +++ b/crates/polars-lazy/src/physical_plan/executors/group_by.rs @@ -119,7 +119,7 @@ impl Executor for GroupByExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run GroupbyExec") + eprintln!("run GroupbyExec") } } if state.verbose() { diff --git a/crates/polars-lazy/src/physical_plan/executors/group_by_dynamic.rs b/crates/polars-lazy/src/physical_plan/executors/group_by_dynamic.rs index 8d758ffc9f4e..5fe6dca17015 100644 --- a/crates/polars-lazy/src/physical_plan/executors/group_by_dynamic.rs +++ b/crates/polars-lazy/src/physical_plan/executors/group_by_dynamic.rs @@ -81,7 +81,7 @@ impl Executor for GroupByDynamicExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run GroupbyDynamicExec") + eprintln!("run GroupbyDynamicExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs b/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs index ad654b22648e..2e77383cc500 100644 --- a/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs +++ b/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs @@ -348,7 +348,7 @@ impl Executor for PartitionGroupByExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run PartitionGroupbyExec") + eprintln!("run PartitionGroupbyExec") } } let original_df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/group_by_rolling.rs b/crates/polars-lazy/src/physical_plan/executors/group_by_rolling.rs index 5ed8262ddd3d..437976b103a3 100644 --- a/crates/polars-lazy/src/physical_plan/executors/group_by_rolling.rs +++ b/crates/polars-lazy/src/physical_plan/executors/group_by_rolling.rs @@ -100,7 +100,7 @@ impl Executor for GroupByRollingExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run GroupbyRollingExec") + eprintln!("run GroupbyRollingExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/hconcat.rs b/crates/polars-lazy/src/physical_plan/executors/hconcat.rs index a61c9f49e02a..0a755f694cfe 100644 --- a/crates/polars-lazy/src/physical_plan/executors/hconcat.rs +++ b/crates/polars-lazy/src/physical_plan/executors/hconcat.rs @@ -12,14 +12,14 @@ impl Executor for HConcatExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run HConcatExec") + eprintln!("run HConcatExec") } } let mut inputs = std::mem::take(&mut self.inputs); let dfs = if !self.options.parallel { if state.verbose() { - println!("HCONCAT: `parallel=false` hconcat is run sequentially") + eprintln!("HCONCAT: `parallel=false` hconcat is run sequentially") } let mut dfs = Vec::with_capacity(inputs.len()); for (idx, mut input) in inputs.into_iter().enumerate() { @@ -33,7 +33,7 @@ impl Executor for HConcatExec { dfs } else { if state.verbose() { - println!("HCONCAT: hconcat is run in parallel") + eprintln!("HCONCAT: hconcat is run in parallel") } // We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv) // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing diff --git a/crates/polars-lazy/src/physical_plan/executors/join.rs b/crates/polars-lazy/src/physical_plan/executors/join.rs index 75587a46d5eb..df3953a5ecb3 100644 --- a/crates/polars-lazy/src/physical_plan/executors/join.rs +++ b/crates/polars-lazy/src/physical_plan/executors/join.rs @@ -38,7 +38,7 @@ impl Executor for JoinExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run JoinExec") + eprintln!("run JoinExec") } } if state.verbose() { diff --git a/crates/polars-lazy/src/physical_plan/executors/projection.rs b/crates/polars-lazy/src/physical_plan/executors/projection.rs index 7147b1094c59..e6e5f8799d7c 100644 --- a/crates/polars-lazy/src/physical_plan/executors/projection.rs +++ b/crates/polars-lazy/src/physical_plan/executors/projection.rs @@ -86,9 +86,9 @@ impl Executor for ProjectionExec { { if state.verbose() { if self.cse_exprs.is_empty() { - println!("run ProjectionExec"); + eprintln!("run ProjectionExec"); } else { - println!("run ProjectionExec with {} CSE", self.cse_exprs.len()) + eprintln!("run ProjectionExec with {} CSE", self.cse_exprs.len()) }; } } diff --git a/crates/polars-lazy/src/physical_plan/executors/python_scan.rs b/crates/polars-lazy/src/physical_plan/executors/python_scan.rs index c7a2c028cb98..d48f7b14fb96 100644 --- a/crates/polars-lazy/src/physical_plan/executors/python_scan.rs +++ b/crates/polars-lazy/src/physical_plan/executors/python_scan.rs @@ -13,7 +13,7 @@ impl Executor for PythonScanExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run PythonScanExec") + eprintln!("run PythonScanExec") } } let with_columns = self.options.with_columns.take(); diff --git a/crates/polars-lazy/src/physical_plan/executors/slice.rs b/crates/polars-lazy/src/physical_plan/executors/slice.rs index a06045c11461..c1ff0a9667a5 100644 --- a/crates/polars-lazy/src/physical_plan/executors/slice.rs +++ b/crates/polars-lazy/src/physical_plan/executors/slice.rs @@ -11,7 +11,7 @@ impl Executor for SliceExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run SliceExec") + eprintln!("run SliceExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/sort.rs b/crates/polars-lazy/src/physical_plan/executors/sort.rs index 2d32a63d6097..709609859bde 100644 --- a/crates/polars-lazy/src/physical_plan/executors/sort.rs +++ b/crates/polars-lazy/src/physical_plan/executors/sort.rs @@ -42,7 +42,7 @@ impl Executor for SortExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run SortExec") + eprintln!("run SortExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/stack.rs b/crates/polars-lazy/src/physical_plan/executors/stack.rs index 9e3dafc3feb2..4802e1866d8b 100644 --- a/crates/polars-lazy/src/physical_plan/executors/stack.rs +++ b/crates/polars-lazy/src/physical_plan/executors/stack.rs @@ -69,9 +69,9 @@ impl Executor for StackExec { { if state.verbose() { if self.cse_exprs.is_empty() { - println!("run StackExec"); + eprintln!("run StackExec"); } else { - println!("run StackExec with {} CSE", self.cse_exprs.len()); + eprintln!("run StackExec with {} CSE", self.cse_exprs.len()); }; } } diff --git a/crates/polars-lazy/src/physical_plan/executors/udf.rs b/crates/polars-lazy/src/physical_plan/executors/udf.rs index 7453752ac158..0916a5493ace 100644 --- a/crates/polars-lazy/src/physical_plan/executors/udf.rs +++ b/crates/polars-lazy/src/physical_plan/executors/udf.rs @@ -11,7 +11,7 @@ impl Executor for UdfExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run UdfExec") + eprintln!("run UdfExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-lazy/src/physical_plan/executors/union.rs b/crates/polars-lazy/src/physical_plan/executors/union.rs index 7b8ed9a497ac..54be89a6047a 100644 --- a/crates/polars-lazy/src/physical_plan/executors/union.rs +++ b/crates/polars-lazy/src/physical_plan/executors/union.rs @@ -14,7 +14,7 @@ impl Executor for UnionExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run UnionExec") + eprintln!("run UnionExec") } } // keep scans thread local if 'fetch' is used. @@ -32,9 +32,9 @@ impl Executor for UnionExec { if !self.options.parallel || sliced_path { if state.verbose() { if !self.options.parallel { - println!("UNION: `parallel=false` union is run sequentially") + eprintln!("UNION: `parallel=false` union is run sequentially") } else { - println!("UNION: `slice is set` union is run sequentially") + eprintln!("UNION: `slice is set` union is run sequentially") } } @@ -80,7 +80,7 @@ impl Executor for UnionExec { concat_df(&dfs) } else { if state.verbose() { - println!("UNION: union is run in parallel") + eprintln!("UNION: union is run in parallel") } // we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv) diff --git a/crates/polars-lazy/src/physical_plan/executors/unique.rs b/crates/polars-lazy/src/physical_plan/executors/unique.rs index 7147132eb867..0b2c13dca65e 100644 --- a/crates/polars-lazy/src/physical_plan/executors/unique.rs +++ b/crates/polars-lazy/src/physical_plan/executors/unique.rs @@ -11,7 +11,7 @@ impl Executor for UniqueExec { #[cfg(debug_assertions)] { if state.verbose() { - println!("run UniqueExec") + eprintln!("run UniqueExec") } } let df = self.input.execute(state)?; diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 7129be8534e5..150ad6f16998 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -867,16 +867,12 @@ impl LogicalPlanBuilder { } pub fn row_index(self, name: &str, offset: Option) -> Self { - let mut schema = try_delayed!(self.0.schema(), &self.0, into).into_owned(); - let schema_mut = Arc::make_mut(&mut schema); - row_index_schema(schema_mut, name); - LogicalPlan::MapFunction { input: Arc::new(self.0), function: FunctionNode::RowIndex { name: ColumnName::from(name), offset, - schema, + schema: Default::default(), }, } .into() diff --git a/crates/polars-plan/src/logical_plan/builder_alp.rs b/crates/polars-plan/src/logical_plan/builder_alp.rs index d3c96d245f3e..c5cf3f304874 100644 --- a/crates/polars-plan/src/logical_plan/builder_alp.rs +++ b/crates/polars-plan/src/logical_plan/builder_alp.rs @@ -311,16 +311,12 @@ impl<'a> IRBuilder<'a> { } pub fn row_index(self, name: Arc, offset: Option) -> Self { - let mut schema = self.schema().into_owned(); - let schema_mut = Arc::make_mut(&mut schema); - row_index_schema(schema_mut, name.as_ref()); - let lp = IR::MapFunction { input: self.root, function: FunctionNode::RowIndex { name, offset, - schema, + schema: Default::default(), }, }; self.add_alp(lp) diff --git a/crates/polars-plan/src/logical_plan/builder_functions.rs b/crates/polars-plan/src/logical_plan/builder_functions.rs index 8badf1b3c2a3..1d9ef90e2d58 100644 --- a/crates/polars-plan/src/logical_plan/builder_functions.rs +++ b/crates/polars-plan/src/logical_plan/builder_functions.rs @@ -54,7 +54,3 @@ pub(super) fn det_melt_schema(args: &MeltArgs, input_schema: &Schema) -> SchemaR new_schema.with_column(value_name, supertype); Arc::new(new_schema) } - -pub(super) fn row_index_schema(schema: &mut Schema, name: &str) { - schema.insert_at_index(0, name.into(), IDX_DTYPE).unwrap(); -} diff --git a/crates/polars-plan/src/logical_plan/functions/mod.rs b/crates/polars-plan/src/logical_plan/functions/mod.rs index c9995e03d2dc..13c8b4be3d6e 100644 --- a/crates/polars-plan/src/logical_plan/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/functions/mod.rs @@ -4,11 +4,13 @@ mod merge_sorted; #[cfg(feature = "python")] mod python_udf; mod rename; +mod schema; + use std::borrow::Cow; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use polars_core::prelude::*; #[cfg(feature = "serde")] @@ -21,6 +23,8 @@ use crate::dsl::python_udf::PythonFunction; use crate::logical_plan::functions::merge_sorted::merge_sorted; use crate::prelude::*; +type CachedSchema = Arc>>; + #[derive(Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum FunctionNode { @@ -91,7 +95,8 @@ pub enum FunctionNode { }, RowIndex { name: Arc, - schema: SchemaRef, + // Might be cached. + schema: CachedSchema, offset: Option, }, } @@ -201,83 +206,6 @@ impl FunctionNode { } } - pub(crate) fn schema<'a>( - &self, - input_schema: &'a SchemaRef, - ) -> PolarsResult> { - use FunctionNode::*; - match self { - Opaque { schema, .. } => match schema { - None => Ok(Cow::Borrowed(input_schema)), - Some(schema_fn) => { - let output_schema = schema_fn.get_schema(input_schema)?; - Ok(Cow::Owned(output_schema)) - }, - }, - #[cfg(feature = "python")] - OpaquePython { schema, .. } => Ok(schema - .as_ref() - .map(|schema| Cow::Owned(schema.clone())) - .unwrap_or_else(|| Cow::Borrowed(input_schema))), - Pipeline { schema, .. } => Ok(Cow::Owned(schema.clone())), - DropNulls { .. } => Ok(Cow::Borrowed(input_schema)), - Count { alias, .. } => { - let mut schema: Schema = Schema::with_capacity(1); - let name = SmartString::from( - alias - .as_ref() - .map(|alias| alias.as_ref()) - .unwrap_or(crate::constants::LEN), - ); - schema.insert_at_index(0, name, IDX_DTYPE)?; - Ok(Cow::Owned(Arc::new(schema))) - }, - Rechunk => Ok(Cow::Borrowed(input_schema)), - Unnest { columns: _columns } => { - #[cfg(feature = "dtype-struct")] - { - let mut new_schema = Schema::with_capacity(input_schema.len() * 2); - for (name, dtype) in input_schema.iter() { - if _columns.iter().any(|item| item.as_ref() == name.as_str()) { - match dtype { - DataType::Struct(flds) => { - for fld in flds { - new_schema.with_column( - fld.name().clone(), - fld.data_type().clone(), - ); - } - }, - DataType::Unknown => { - // pass through unknown - }, - _ => { - polars_bail!( - SchemaMismatch: "expected struct dtype, got: `{}`", dtype - ); - }, - } - } else { - new_schema.with_column(name.clone(), dtype.clone()); - } - } - - Ok(Cow::Owned(Arc::new(new_schema))) - } - #[cfg(not(feature = "dtype-struct"))] - { - panic!("activate feature 'dtype-struct'") - } - }, - #[cfg(feature = "merge_sorted")] - MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)), - Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new), - Explode { schema, .. } | RowIndex { schema, .. } | Melt { schema, .. } => { - Ok(Cow::Owned(schema.clone())) - }, - } - } - pub(crate) fn allow_predicate_pd(&self) -> bool { use FunctionNode::*; match self { diff --git a/crates/polars-plan/src/logical_plan/functions/schema.rs b/crates/polars-plan/src/logical_plan/functions/schema.rs new file mode 100644 index 000000000000..a722e48887a4 --- /dev/null +++ b/crates/polars-plan/src/logical_plan/functions/schema.rs @@ -0,0 +1,110 @@ +use super::*; + +impl FunctionNode { + pub(crate) fn clear_cached_schema(&self) { + use FunctionNode::*; + // We will likely add more branches later + #[allow(clippy::single_match)] + match self { + RowIndex { schema, .. } => { + let mut guard = schema.lock().unwrap(); + *guard = None; + }, + _ => {}, + } + } + + pub(crate) fn schema<'a>( + &self, + input_schema: &'a SchemaRef, + ) -> PolarsResult> { + use FunctionNode::*; + match self { + Opaque { schema, .. } => match schema { + None => Ok(Cow::Borrowed(input_schema)), + Some(schema_fn) => { + let output_schema = schema_fn.get_schema(input_schema)?; + Ok(Cow::Owned(output_schema)) + }, + }, + #[cfg(feature = "python")] + OpaquePython { schema, .. } => Ok(schema + .as_ref() + .map(|schema| Cow::Owned(schema.clone())) + .unwrap_or_else(|| Cow::Borrowed(input_schema))), + Pipeline { schema, .. } => Ok(Cow::Owned(schema.clone())), + DropNulls { .. } => Ok(Cow::Borrowed(input_schema)), + Count { alias, .. } => { + let mut schema: Schema = Schema::with_capacity(1); + let name = SmartString::from( + alias + .as_ref() + .map(|alias| alias.as_ref()) + .unwrap_or(crate::constants::LEN), + ); + schema.insert_at_index(0, name, IDX_DTYPE)?; + Ok(Cow::Owned(Arc::new(schema))) + }, + Rechunk => Ok(Cow::Borrowed(input_schema)), + Unnest { columns: _columns } => { + #[cfg(feature = "dtype-struct")] + { + let mut new_schema = Schema::with_capacity(input_schema.len() * 2); + for (name, dtype) in input_schema.iter() { + if _columns.iter().any(|item| item.as_ref() == name.as_str()) { + match dtype { + DataType::Struct(flds) => { + for fld in flds { + new_schema.with_column( + fld.name().clone(), + fld.data_type().clone(), + ); + } + }, + DataType::Unknown => { + // pass through unknown + }, + _ => { + polars_bail!( + SchemaMismatch: "expected struct dtype, got: `{}`", dtype + ); + }, + } + } else { + new_schema.with_column(name.clone(), dtype.clone()); + } + } + + Ok(Cow::Owned(Arc::new(new_schema))) + } + #[cfg(not(feature = "dtype-struct"))] + { + panic!("activate feature 'dtype-struct'") + } + }, + #[cfg(feature = "merge_sorted")] + MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)), + Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new), + RowIndex { schema, name, .. } => { + Ok(Cow::Owned(row_index_schema(schema, input_schema, name))) + }, + Explode { schema, .. } | Melt { schema, .. } => Ok(Cow::Owned(schema.clone())), + } + } +} + +fn row_index_schema( + cached_schema: &CachedSchema, + input_schema: &SchemaRef, + name: &str, +) -> SchemaRef { + let mut guard = cached_schema.lock().unwrap(); + if let Some(schema) = &*guard { + return schema.clone(); + } + let mut schema = (**input_schema).clone(); + schema.insert_at_index(0, name.into(), IDX_DTYPE).unwrap(); + let schema_ref = Arc::new(schema); + *guard = Some(schema_ref.clone()); + schema_ref +} diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs index d1ff5dfdcf89..f0b71e3ff42f 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs @@ -106,6 +106,10 @@ pub(super) fn process_functions( expr_arena, expands_schema, )?; + + // Remove the cached schema + function.clear_cached_schema(); + if local_projections.is_empty() { Ok(lp) } else { diff --git a/py-polars/tests/unit/test_cse.py b/py-polars/tests/unit/test_cse.py index cec9c394009b..9be52df60ef9 100644 --- a/py-polars/tests/unit/test_cse.py +++ b/py-polars/tests/unit/test_cse.py @@ -218,7 +218,7 @@ def test_cse_expr_selection_context(monkeypatch: Any, capfd: Any) -> None: ) assert_frame_equal(result, expected) - out = capfd.readouterr().out + out = capfd.readouterr().err assert "run ProjectionExec with 2 CSE" in out assert "run StackExec with 2 CSE" in out @@ -667,3 +667,20 @@ def test_cse_15548() -> None: assert len(ldf3.collect(comm_subplan_elim=False)) == 4 assert len(ldf3.collect(comm_subplan_elim=True)) == 4 + + +@pytest.mark.debug() +def test_cse_and_schema_update_projection_pd(capfd: Any, monkeypatch: Any) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + df = pl.LazyFrame({"a": [1, 2], "b": [99, 99]}) + + assert df.lazy().with_row_index().select( + pl.when(pl.col("b") < 10) + .then(0.1 * pl.col("b")) + .when(pl.col("b") < 100) + .then(0.2 * pl.col("b")) + ).collect(comm_subplan_elim=False).to_dict(as_series=False) == { + "literal": [19.8, 19.8] + } + captured = capfd.readouterr().err + assert "1 CSE" in captured diff --git a/py-polars/tests/unit/test_lazy.py b/py-polars/tests/unit/test_lazy.py index 42124a2b55a6..082f9312c94b 100644 --- a/py-polars/tests/unit/test_lazy.py +++ b/py-polars/tests/unit/test_lazy.py @@ -1189,8 +1189,8 @@ def test_lazy_cache_hit(monkeypatch: Any, capfd: Any) -> None: expected = pl.LazyFrame({"a": [0, 0, 0], "c": ["x", "y", "z"]}) assert_frame_equal(result, expected) - (out, _) = capfd.readouterr() - assert "CACHE HIT" in out + (_, err) = capfd.readouterr() + assert "CACHE HIT" in err def test_lazy_cache_parallel() -> None: