Skip to content

Commit

Permalink
refactor recursion to iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 5, 2021
1 parent e2ca388 commit 453dc0f
Showing 1 changed file with 15 additions and 59 deletions.
74 changes: 15 additions & 59 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::logical_plan::iterator::ArenaExprIter;
use crate::logical_plan::iterator::{ArenaExprIter, ArenaLpIter};
use crate::logical_plan::Context;
use crate::prelude::*;
use ahash::RandomState;
Expand All @@ -7,14 +7,12 @@ use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;

#[cfg(feature = "private")]
pub(crate) fn equal_aexprs(left: &[Node], right: &[Node], expr_arena: &Arena<AExpr>) -> bool {
left.iter()
.zip(right.iter())
.all(|(l, r)| AExpr::eq(*l, *r, expr_arena))
}

#[cfg(feature = "private")]
pub(crate) fn remove_duplicate_aexprs(exprs: &[Node], expr_arena: &Arena<AExpr>) -> Vec<Node> {
let mut unique = HashSet::with_capacity_and_hasher(exprs.len(), RandomState::new());
let mut new = Vec::with_capacity(exprs.len());
Expand Down Expand Up @@ -204,6 +202,7 @@ pub(crate) fn rename_expr_root_name(expr: &Expr, new_name: Arc<String>) -> Resul
Ok(node_to_exp(root, &arena))
}

/// Take a list of expressions and a schema and determine the output schema.
pub(crate) fn expressions_to_schema(expr: &[Expr], schema: &Schema, ctxt: Context) -> Schema {
let fields = expr
.iter()
Expand All @@ -219,63 +218,20 @@ pub(crate) fn agg_source_paths(
paths: &mut HashSet<PathBuf, RandomState>,
lp_arena: &Arena<ALogicalPlan>,
) {
use ALogicalPlan::*;
let logical_plan = lp_arena.get(root_lp);
match logical_plan {
Slice { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Selection { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Cache { input } => {
agg_source_paths(*input, paths, lp_arena);
}
#[cfg(feature = "csv-file")]
CsvScan { path, .. } => {
paths.insert(path.clone());
}
#[cfg(feature = "parquet")]
ParquetScan { path, .. } => {
paths.insert(path.clone());
}
DataFrameScan { .. } => (),
Projection { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
LocalProjection { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Sort { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Explode { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Distinct { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Aggregate { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Join {
input_left,
input_right,
..
} => {
agg_source_paths(*input_left, paths, lp_arena);
agg_source_paths(*input_right, paths, lp_arena);
}
HStack { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Melt { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
}
Udf { input, .. } => {
agg_source_paths(*input, paths, lp_arena);
lp_arena.iter(root_lp).for_each(|(_, lp)| {
use ALogicalPlan::*;
match lp {
#[cfg(feature = "csv-file")]
CsvScan { path, .. } => {
paths.insert(path.clone());
}
#[cfg(feature = "parquet")]
ParquetScan { path, .. } => {
paths.insert(path.clone());
}
_ => {}
}
}
})
}

pub(crate) fn try_path_to_str(path: &Path) -> Result<&str> {
Expand Down

0 comments on commit 453dc0f

Please sign in to comment.