Skip to content

Commit

Permalink
refactor: remove 'fast-projection' node (#15253)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 23, 2024
1 parent 6793954 commit 07538dd
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 99 deletions.
11 changes: 0 additions & 11 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,17 +643,6 @@ where
let op = operators::FilterOperator { predicate };
Box::new(op) as Box<dyn Operator>
},
MapFunction {
function: FunctionNode::FastProjection { columns, .. },
input,
} => {
let input_schema = lp_arena.get(*input).schema(lp_arena);
let op = operators::SimpleProjectionOperator::new(
columns.clone(),
input_schema.into_owned(),
);
Box::new(op) as Box<dyn Operator>
},
MapFunction { function, .. } => {
let op = operators::FunctionOperator::new(function.clone());
Box::new(op) as Box<dyn Operator>
Expand Down
54 changes: 5 additions & 49 deletions crates/polars-plan/src/logical_plan/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ pub enum FunctionNode {
Unnest {
columns: Arc<[Arc<str>]>,
},
FastProjection {
columns: Arc<[SmartString]>,
duplicate_check: bool,
},
DropNulls {
subset: Arc<[Arc<str>]>,
},
Expand Down Expand Up @@ -103,16 +99,6 @@ impl PartialEq for FunctionNode {
fn eq(&self, other: &Self) -> bool {
use FunctionNode::*;
match (self, other) {
(
FastProjection {
columns: l,
duplicate_check: dl,
},
FastProjection {
columns: r,
duplicate_check: dr,
},
) => l == r && dl == dr,
(DropNulls { subset: l }, DropNulls { subset: r }) => l == r,
(Rechunk, Rechunk) => true,
(Count { paths: paths_l, .. }, Count { paths: paths_r, .. }) => paths_l == paths_r,
Expand Down Expand Up @@ -144,12 +130,9 @@ impl FunctionNode {
Rechunk | Pipeline { .. } => false,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => false,
DropNulls { .. }
| FastProjection { .. }
| Count { .. }
| Unnest { .. }
| Rename { .. }
| Explode { .. } => true,
DropNulls { .. } | Count { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => {
true
},
Melt { args, .. } => args.streamable,
Opaque { streamable, .. } => *streamable,
#[cfg(feature = "python")]
Expand Down Expand Up @@ -188,16 +171,6 @@ impl FunctionNode {
.map(|schema| Cow::Owned(schema.clone()))
.unwrap_or_else(|| Cow::Borrowed(input_schema))),
Pipeline { schema, .. } => Ok(Cow::Owned(schema.clone())),
FastProjection { columns, .. } => {
let schema = columns
.iter()
.map(|name| {
let name = name.as_ref();
input_schema.try_get_field(name)
})
.collect::<PolarsResult<Schema>>()?;
Ok(Cow::Owned(Arc::new(schema)))
},
DropNulls { .. } => Ok(Cow::Borrowed(input_schema)),
Count { alias, .. } => {
let mut schema: Schema = Schema::with_capacity(1);
Expand Down Expand Up @@ -262,8 +235,7 @@ impl FunctionNode {
Opaque { predicate_pd, .. } => *predicate_pd,
#[cfg(feature = "python")]
OpaquePython { predicate_pd, .. } => *predicate_pd,
FastProjection { .. }
| DropNulls { .. }
DropNulls { .. }
| Rechunk
| Unnest { .. }
| Rename { .. }
Expand All @@ -282,8 +254,7 @@ impl FunctionNode {
Opaque { projection_pd, .. } => *projection_pd,
#[cfg(feature = "python")]
OpaquePython { projection_pd, .. } => *projection_pd,
FastProjection { .. }
| DropNulls { .. }
DropNulls { .. }
| Rechunk
| Count { .. }
| Unnest { .. }
Expand Down Expand Up @@ -319,16 +290,6 @@ impl FunctionNode {
schema,
..
} => python_udf::call_python_udf(function, df, *validate_output, schema.as_deref()),
FastProjection {
columns,
duplicate_check,
} => {
if *duplicate_check {
df._select_impl(columns.as_ref())
} else {
df._select_impl_unchecked(columns.as_ref())
}
},
DropNulls { subset } => df.drop_nulls(Some(subset.as_ref())),
Count {
paths, scan_type, ..
Expand Down Expand Up @@ -386,11 +347,6 @@ impl Display for FunctionNode {
Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),
#[cfg(feature = "python")]
OpaquePython { .. } => write!(f, "python dataframe udf"),
FastProjection { columns, .. } => {
write!(f, "FAST_PROJECT: ")?;
let columns = columns.as_ref();
fmt_column_delimited(f, columns, "[", "]")
},
DropNulls { subset } => {
write!(f, "DROP_NULLS by: ")?;
let subset = subset.as_ref();
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod collect_members;
mod count_star;
#[cfg(feature = "cse")]
mod cse_expr;
mod fast_projection;
#[cfg(any(
feature = "ipc",
feature = "parquet",
Expand All @@ -26,6 +25,7 @@ mod flatten_union;
mod fused;
mod predicate_pushdown;
mod projection_pushdown;
mod simple_projection;
mod simplify_expr;
mod simplify_functions;
mod slice_pushdown_expr;
Expand All @@ -35,10 +35,10 @@ mod type_coercion;

use delay_rechunk::DelayRechunk;
use drop_nulls::ReplaceDropNulls;
use fast_projection::FastProjectionAndCollapse;
use polars_io::predicates::PhysicalIoExpr;
pub use predicate_pushdown::PredicatePushDown;
pub use projection_pushdown::ProjectionPushDown;
use simple_projection::SimpleProjectionAndCollapse;
pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
use slice_pushdown_lp::SlicePushDown;
pub use stack_opt::{OptimizationRule, StackOptimizer};
Expand Down Expand Up @@ -156,7 +156,7 @@ pub fn optimize(

// make sure its before slice pushdown.
if fast_projection {
rules.push(Box::new(FastProjectionAndCollapse::new(eager)));
rules.push(Box::new(SimpleProjectionAndCollapse::new(eager)));
}

if !eager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use super::*;
/// It is important that this optimization is ran after projection pushdown.
///
/// The schema reported after this optimization is also
pub(super) struct FastProjectionAndCollapse {
pub(super) struct SimpleProjectionAndCollapse {
/// keep track of nodes that are already processed when they
/// can be expensive. Schema materialization can be for instance.
processed: BTreeSet<Node>,
eager: bool,
}

impl FastProjectionAndCollapse {
impl SimpleProjectionAndCollapse {
pub(super) fn new(eager: bool) -> Self {
Self {
processed: Default::default(),
Expand All @@ -27,7 +27,7 @@ impl FastProjectionAndCollapse {
}
}

impl OptimizationRule for FastProjectionAndCollapse {
impl OptimizationRule for SimpleProjectionAndCollapse {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<ALogicalPlan>,
Expand Down Expand Up @@ -93,39 +93,6 @@ impl OptimizationRule for FastProjectionAndCollapse {
_ => None,
}
},
MapFunction {
input,
function: FunctionNode::FastProjection { columns, .. },
} if !self.eager => {
// if there are 2 subsequent fast projections, flatten them and only take the last
match lp_arena.get(*input) {
MapFunction {
function: FunctionNode::FastProjection { .. },
input: prev_input,
} => Some(MapFunction {
input: *prev_input,
function: FunctionNode::FastProjection {
columns: columns.clone(),
duplicate_check: true,
},
}),
// cleanup projections set in projection pushdown just above caches
// they are not needed.
cache_lp @ Cache { .. } if self.processed.insert(node) => {
let cache_schema = cache_lp.schema(lp_arena);
if cache_schema.len() == columns.len()
&& cache_schema.iter_names().zip(columns.iter()).all(
|(left_name, right_name)| left_name.as_str() == right_name.as_str(),
)
{
Some(cache_lp.clone())
} else {
None
}
},
_ => None,
}
},
// if there are 2 subsequent caches, flatten them and only take the inner
Cache {
input,
Expand Down

0 comments on commit 07538dd

Please sign in to comment.