diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index b37d76f0c762..96ec54896e83 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -232,18 +232,30 @@ impl LazyFrame { Ok(self.clone().to_alp()?.describe_tree_format()) } + // @NOTE: this is used because we want to set the `enable_fmt` flag of `optimize_with_scratch` + // to `true` for describe. + fn _describe_to_alp_optimized(mut self) -> PolarsResult { + let (mut lp_arena, mut expr_arena) = self.get_arenas(); + let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?; + + Ok(IRPlan::new(node, lp_arena, expr_arena)) + } + /// Return a String describing the optimized logical plan. /// /// Returns `Err` if optimizing the logical plan fails. pub fn describe_optimized_plan(&self) -> PolarsResult { - Ok(self.clone().to_alp_optimized()?.describe()) + Ok(self.clone()._describe_to_alp_optimized()?.describe()) } /// Return a String describing the optimized logical plan in tree format. /// /// Returns `Err` if optimizing the logical plan fails. pub fn describe_optimized_plan_tree(&self) -> PolarsResult { - Ok(self.clone().to_alp_optimized()?.describe_tree_format()) + Ok(self + .clone() + ._describe_to_alp_optimized()? + .describe_tree_format()) } /// Return a String describing the logical plan. @@ -551,7 +563,7 @@ impl LazyFrame { lp_arena: &mut Arena, expr_arena: &mut Arena, scratch: &mut Vec, - _fmt: bool, + enable_fmt: bool, ) -> PolarsResult { #[allow(unused_mut)] let mut opt_state = self.opt_state; @@ -591,16 +603,18 @@ impl LazyFrame { lp_arena, expr_arena, scratch, - _fmt, + enable_fmt, true, opt_state.row_estimate, )?; } #[cfg(not(feature = "streaming"))] { + _ = enable_fmt; panic!("activate feature 'streaming'") } } + Ok(lp_top) } diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 6b906126aeeb..9b63808129ab 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -212,7 +212,7 @@ pub(super) fn construct( }; // keep the original around for formatting purposes let original_lp = if fmt { - let original_lp = node_to_lp_cloned(insertion_location, expr_arena, lp_arena); + let original_lp = IRPlan::new(insertion_location, lp_arena.clone(), expr_arena.clone()); Some(original_lp) } else { None @@ -233,7 +233,7 @@ fn get_pipeline_node( lp_arena: &mut Arena, mut pipelines: Vec, schema: SchemaRef, - original_lp: Option, + original_lp: Option, ) -> IR { // create a dummy input as the map function will call the input // so we just create a scan that returns an empty df diff --git a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs index 03d8a5c6617e..7dbce5e69ee6 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs @@ -429,6 +429,7 @@ pub(crate) fn insert_streaming_nodes( }, } } + let mut inserted = false; for tree in pipeline_trees { if is_valid_tree(&tree) diff --git a/crates/polars-plan/src/logical_plan/alp/dot.rs b/crates/polars-plan/src/logical_plan/alp/dot.rs index a0692b7ef9d6..ab1070619cd4 100644 --- a/crates/polars-plan/src/logical_plan/alp/dot.rs +++ b/crates/polars-plan/src/logical_plan/alp/dot.rs @@ -6,7 +6,7 @@ use crate::constants::UNLIMITED_CACHE; use crate::prelude::alp::format::ColumnsDisplay; use crate::prelude::*; -pub struct IRDotDisplay<'a>(pub(crate) IRPlanRef<'a>); +pub struct IRDotDisplay<'a>(IRPlanRef<'a>); const INDENT: &str = " "; @@ -43,6 +43,14 @@ fn write_label<'a, 'b>( } impl<'a> IRDotDisplay<'a> { + pub fn new(lp: IRPlanRef<'a>) -> Self { + if let Some(streaming_lp) = lp.extract_streaming_plan() { + return Self(streaming_lp); + } + + Self(lp) + } + fn with_root(&self, root: Node) -> Self { Self(self.0.with_root(root)) } diff --git a/crates/polars-plan/src/logical_plan/alp/format.rs b/crates/polars-plan/src/logical_plan/alp/format.rs index 5de2006ed602..d5801b6179a5 100644 --- a/crates/polars-plan/src/logical_plan/alp/format.rs +++ b/crates/polars-plan/src/logical_plan/alp/format.rs @@ -9,7 +9,10 @@ use recursive::recursive; use crate::prelude::*; -pub struct IRDisplay<'a>(pub(crate) IRPlanRef<'a>); +pub struct IRDisplay<'a> { + is_streaming: bool, + lp: IRPlanRef<'a>, +} #[derive(Clone, Copy)] pub struct ExprIRDisplay<'a> { @@ -91,13 +94,62 @@ fn write_scan( } impl<'a> IRDisplay<'a> { + pub fn new(lp: IRPlanRef<'a>) -> Self { + if let Some(streaming_lp) = lp.extract_streaming_plan() { + return Self { + is_streaming: true, + lp: streaming_lp, + }; + } + + Self { + is_streaming: false, + lp, + } + } + + fn root(&self) -> &IR { + self.lp.root() + } + + fn with_root(&self, root: Node) -> Self { + Self { + is_streaming: false, + lp: self.lp.with_root(root), + } + } + + fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> { + ExprIRDisplay { + node: root.node(), + output_name: root.output_name_inner(), + expr_arena: self.lp.expr_arena, + } + } + + fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { + ExprIRSliceDisplay { + exprs, + expr_arena: self.lp.expr_arena, + } + } + #[recursive] fn _format(&self, f: &mut Formatter, indent: usize) -> fmt::Result { if indent != 0 { writeln!(f)?; } + + let indent = if self.is_streaming { + writeln!(f, "STREAMING:")?; + indent + 2 + } else { + indent + }; + let sub_indent = indent + 2; use IR::*; + match self.root() { #[cfg(feature = "python")] PythonScan { options, predicate } => { @@ -293,8 +345,7 @@ impl<'a> IRDisplay<'a> { MapFunction { input, function, .. } => { - let function_fmt = format!("{function}"); - write!(f, "{:indent$}{function_fmt}", "")?; + write!(f, "{:indent$}{function}", "")?; self.with_root(*input)._format(f, sub_indent) }, ExtContext { input, .. } => { @@ -313,7 +364,7 @@ impl<'a> IRDisplay<'a> { }, SimpleProjection { input, columns } => { let num_columns = columns.as_ref().len(); - let total_columns = self.0.lp_arena.get(*input).schema(self.0.lp_arena).len(); + let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len(); let columns = ColumnsDisplay(columns.as_ref()); write!( @@ -329,31 +380,6 @@ impl<'a> IRDisplay<'a> { } } -impl<'a> IRDisplay<'a> { - fn root(&self) -> &IR { - self.0.root() - } - - fn with_root(&self, root: Node) -> Self { - Self(self.0.with_root(root)) - } - - fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> { - ExprIRDisplay { - node: root.node(), - output_name: root.output_name_inner(), - expr_arena: self.0.expr_arena, - } - } - - fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { - ExprIRSliceDisplay { - exprs, - expr_arena: self.0.expr_arena, - } - } -} - impl<'a> ExprIRDisplay<'a> { fn with_slice(&self, exprs: &'a [T]) -> ExprIRSliceDisplay<'a, T> { ExprIRSliceDisplay { diff --git a/crates/polars-plan/src/logical_plan/alp/mod.rs b/crates/polars-plan/src/logical_plan/alp/mod.rs index 34838498de13..7ba86a11ce52 100644 --- a/crates/polars-plan/src/logical_plan/alp/mod.rs +++ b/crates/polars-plan/src/logical_plan/alp/mod.rs @@ -171,6 +171,11 @@ impl IRPlan { } } + /// Extract the original logical plan if the plan is for the Streaming Engine + pub fn extract_streaming_plan(&self) -> Option { + self.as_ref().extract_streaming_plan() + } + pub fn describe(&self) -> String { self.as_ref().describe() } @@ -180,11 +185,11 @@ impl IRPlan { } pub fn display(&self) -> format::IRDisplay { - format::IRDisplay(self.as_ref()) + self.as_ref().display() } pub fn display_dot(&self) -> dot::IRDotDisplay { - dot::IRDotDisplay(self.as_ref()) + self.as_ref().display_dot() } } @@ -201,12 +206,28 @@ impl<'a> IRPlanRef<'a> { } } + /// Extract the original logical plan if the plan is for the Streaming Engine + pub fn extract_streaming_plan(self) -> Option> { + // @NOTE: the streaming engine replaces the whole tree with a MapFunction { Pipeline, .. } + // and puts the original plan somewhere in there. This is how we extract it. Disguisting, I + // know. + let IR::MapFunction { input: _, function } = self.root() else { + return None; + }; + + let FunctionNode::Pipeline { original, .. } = function else { + return None; + }; + + Some(original.as_ref()?.as_ref().as_ref()) + } + pub fn display(self) -> format::IRDisplay<'a> { - format::IRDisplay(self) + format::IRDisplay::new(self) } pub fn display_dot(self) -> dot::IRDotDisplay<'a> { - dot::IRDotDisplay(self) + dot::IRDotDisplay::new(self) } pub fn describe(self) -> String { diff --git a/crates/polars-plan/src/logical_plan/alp/tree_format.rs b/crates/polars-plan/src/logical_plan/alp/tree_format.rs index 0c1761fc95c0..57ab2773304c 100644 --- a/crates/polars-plan/src/logical_plan/alp/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/alp/tree_format.rs @@ -102,6 +102,15 @@ fn multiline_expression(expr: &str) -> std::borrow::Cow<'_, str> { impl<'a> TreeFmtNode<'a> { pub fn root_logical_plan(lp: IRPlanRef<'a>) -> Self { + if let Some(streaming_lp) = lp.extract_streaming_plan() { + return Self { + h: Some("Streaming".to_string()), + content: TreeFmtNodeContent::LogicalPlan(streaming_lp.lp_top), + + lp: streaming_lp, + }; + } + Self { h: None, content: TreeFmtNodeContent::LogicalPlan(lp.lp_top), diff --git a/crates/polars-plan/src/logical_plan/functions/mod.rs b/crates/polars-plan/src/logical_plan/functions/mod.rs index 8c12cf8eae8b..fb9485fae1fd 100644 --- a/crates/polars-plan/src/logical_plan/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/functions/mod.rs @@ -61,7 +61,7 @@ pub enum FunctionNode { Pipeline { function: Arc, schema: SchemaRef, - original: Option>, + original: Option>, }, Unnest { columns: Arc<[Arc]>, @@ -327,8 +327,7 @@ impl Display for FunctionNode { MergeSorted { .. } => write!(f, "MERGE SORTED"), Pipeline { original, .. } => { if let Some(original) = original { - let ir_plan = original.as_ref().clone().to_alp().unwrap(); - let ir_display = ir_plan.display(); + let ir_display = original.as_ref().display(); writeln!(f, "--- STREAMING")?; write!(f, "{ir_display}")?; diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index 4a0021c2a9e8..06741ff454fe 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -31,7 +31,7 @@ impl Default for Node { static ARENA_VERSION: AtomicU32 = AtomicU32::new(0); -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Arena { version: u32, items: Vec,