Skip to content

Commit

Permalink
use jupyter dot visualization
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 29, 2022
1 parent 185559c commit 971365d
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 103 deletions.
12 changes: 7 additions & 5 deletions polars/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,18 @@ impl LogicalPlan {
}
HStack { input, exprs, .. } => {
let mut current_node = String::with_capacity(128);
current_node.push_str("STACK");
current_node.push_str("WITH COLUMNS [");
for e in exprs {
if let Expr::Alias(_, name) = e {
current_node.push_str(&format!(" {},", name));
current_node.push_str(&format!("\"{}\",", name));
} else {
for name in expr_to_root_column_names(e).iter().take(1) {
current_node.push_str(&format!(" {},", name));
current_node.push_str(&format!("\"{}\",", name));
}
}
}
current_node.pop();
current_node.push(']');
current_node.push_str(&format!(" [{:?}]", (branch, id)));
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
Expand Down Expand Up @@ -302,8 +304,8 @@ impl LogicalPlan {
input_left.dot(acc_str, (branch + 10, id + 1), &current_node)?;
input_right.dot(acc_str, (branch + 20, id + 1), &current_node)
}
Udf { input, .. } => {
let current_node = format!("UDF [{:?}]", (branch, id));
Udf { input, options, .. } => {
let current_node = format!("{} [{:?}]", options.fmt_str, (branch, id));
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
Expand Down
53 changes: 0 additions & 53 deletions polars/polars-lazy/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,59 +194,6 @@ where
}
}

#[derive(Copy, Clone, Debug, PartialEq)]
pub enum ApplyOptions {
/// Collect groups to a list and apply the function over the groups.
/// This can be important in aggregation context.
ApplyGroups,
// collect groups to a list and then apply
ApplyList,
// do not collect before apply
ApplyFlat,
}

#[derive(Copy, Clone, Debug, PartialEq)]
pub struct WindowOptions {
/// Explode the aggregated list and just do a hstack instead of a join
/// this requires the groups to be sorted to make any sense
pub(crate) explode: bool,
}

#[derive(Clone, Copy, PartialEq, Debug)]
pub struct FunctionOptions {
/// Collect groups to a list and apply the function over the groups.
/// This can be important in aggregation context.
pub(crate) collect_groups: ApplyOptions,
/// There can be two ways of expanding wildcards:
///
/// Say the schema is 'a', 'b' and there is a function f
/// f('*')
/// can expand to:
/// 1.
/// f('a', 'b')
/// or
/// 2.
/// f('a'), f('b')
///
/// setting this to true, will lead to behavior 1.
///
/// this also accounts for regex expansion
pub(crate) input_wildcard_expansion: bool,

/// automatically explode on unit length it ran as final aggregation.
///
/// this is the case for aggregations like sum, min, covariance etc.
/// We need to know this because we cannot see the difference between
/// the following functions based on the output type and number of elements:
///
/// x: [1, 2, 3]
///
/// head_1(x) -> [1]
/// sum(x) -> [4]
pub(crate) auto_explode: bool,
pub(crate) fmt_str: &'static str,
}

#[derive(PartialEq, Clone)]
pub enum AggExpr {
Min(Box<Expr>),
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl LazyFrame {
},
None,
None,
Some("RENAME"),
)
}

Expand Down Expand Up @@ -971,6 +972,7 @@ impl LazyFrame {
function: F,
optimizations: Option<AllowedOptimizations>,
schema: Option<Schema>,
name: Option<&'static str>,
) -> LazyFrame
where
F: DataFrameUdf + 'static,
Expand All @@ -982,6 +984,7 @@ impl LazyFrame {
function,
optimizations.unwrap_or_default(),
schema.map(Arc::new),
name.unwrap_or("ANONYMOUS UDF"),
)
.build();
Self::from_logical_plan(lp, opt_state)
Expand All @@ -1001,6 +1004,7 @@ impl LazyFrame {
move |df: DataFrame| df.with_row_count(&name),
Some(AllowedOptimizations::default()),
Some(new_schema),
Some("WITH ROW COUNT"),
)
}
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, rechunk: bool) -> Result<LazyFra
},
Some(AllowedOptimizations::default()),
None,
Some("RECHUNK"),
))
} else {
Ok(lf)
Expand Down
11 changes: 3 additions & 8 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ pub enum ALogicalPlan {
Udf {
input: Node,
function: Arc<dyn DataFrameUdf>,
/// allow predicate pushdown optimizations
predicate_pd: bool,
/// allow projection pushdown optimizations
projection_pd: bool,
options: LogicalPlanUdfOptions,
schema: Option<SchemaRef>,
},
Union {
Expand Down Expand Up @@ -370,15 +367,13 @@ impl ALogicalPlan {
}
Udf {
function,
predicate_pd,
projection_pd,
options,
schema,
..
} => Udf {
input: inputs[0],
function: function.clone(),
predicate_pd: *predicate_pd,
projection_pd: *projection_pd,
options: *options,
schema: schema.clone(),
},
}
Expand Down
11 changes: 9 additions & 2 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl LogicalPlanBuilder {
|_| Ok(DataFrame::new_no_checks(vec![])),
AllowedOptimizations::default(),
Some(Arc::new(Schema::default())),
"EMPTY PROJECTION",
)
} else {
LogicalPlan::Projection {
Expand Down Expand Up @@ -425,15 +426,21 @@ impl LogicalPlanBuilder {
function: F,
optimizations: AllowedOptimizations,
schema: Option<SchemaRef>,
name: &'static str,
) -> Self
where
F: DataFrameUdf + 'static,
{
let options = LogicalPlanUdfOptions {
predicate_pd: optimizations.predicate_pushdown,
projection_pd: optimizations.projection_pushdown,
fmt_str: name,
};

LogicalPlan::Udf {
input: Box::new(self.0),
function: Arc::new(function),
predicate_pd: optimizations.predicate_pushdown,
projection_pd: optimizations.projection_pushdown,
options,
schema,
}
.into()
Expand Down
12 changes: 4 additions & 8 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,14 @@ pub(crate) fn to_alp(
LogicalPlan::Udf {
input,
function,
projection_pd,
predicate_pd,
options,
schema,
} => {
let input = to_alp(*input, expr_arena, lp_arena);
ALogicalPlan::Udf {
input,
function,
projection_pd,
predicate_pd,
options,
schema,
}
}
Expand Down Expand Up @@ -817,16 +815,14 @@ pub(crate) fn node_to_lp(
ALogicalPlan::Udf {
input,
function,
predicate_pd,
projection_pd,
options,
schema,
} => {
let input = Box::new(node_to_lp(input, expr_arena, lp_arena));
LogicalPlan::Udf {
input,
function,
predicate_pd,
projection_pd,
options,
schema,
}
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ FROM
Slice { input, offset, len } => {
write!(f, "{:?}\nSLICE[offset: {}, len: {}]", input, offset, len)
}
Udf { input, .. } => write!(f, "UDF \n{:?}", input),
Udf { input, options, .. } => write!(f, "{} \n{:?}", options.fmt_str, input),
}
}
}
Expand All @@ -177,7 +177,7 @@ impl fmt::Debug for Expr {
Explode(expr) => write!(f, "{:?}.explode()", expr),
Duplicated(expr) => write!(f, "{:?}.is_duplicate()", expr),
Reverse(expr) => write!(f, "{:?}.reverse()", expr),
Alias(expr, name) => write!(f, "{:?}.alias({})", expr, name),
Alias(expr, name) => write!(f, "{:?}.alias(\"{}\")", expr, name),
Column(name) => write!(f, "col(\"{}\")", name),
Literal(v) => {
match v {
Expand Down
5 changes: 1 addition & 4 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,7 @@ pub enum LogicalPlan {
Udf {
input: Box<LogicalPlan>,
function: Arc<dyn DataFrameUdf>,
/// allow predicate pushdown optimizations
predicate_pd: bool,
/// allow projection pushdown optimizations
projection_pd: bool,
options: LogicalPlanUdfOptions,
schema: Option<SchemaRef>,
},
Union {
Expand Down
11 changes: 8 additions & 3 deletions polars/polars-lazy/src/logical_plan/optimizer/drop_nulls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ impl OptimizationRule for ReplaceDropNulls {

let function = move |df: DataFrame| df.drop_nulls(Some(&subset));

Some(ALogicalPlan::Udf {
input: *input,
function: Arc::new(function),
let options = LogicalPlanUdfOptions {
// does not matter as this runs after pushdowns have occurred
predicate_pd: true,
projection_pd: true,
fmt_str: "DROP NULLS",
};

Some(ALogicalPlan::Udf {
input: *input,
function: Arc::new(function),
options,
schema: None,
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ fn impl_fast_projection(
if columns.len() == expr.len() {
let function = move |df: DataFrame| df.select(&columns);

let options = LogicalPlanUdfOptions {
predicate_pd: true,
projection_pd: true,
fmt_str: "FAST PROJECTION",
};

let lp = ALogicalPlan::Udf {
input,
function: Arc::new(function),
predicate_pd: true,
projection_pd: true,
schema,
options,
};

Some(lp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ impl PredicatePushDown {

lp @ Udf { .. } => {
if let ALogicalPlan::Udf {
predicate_pd: true, ..
options: LogicalPlanUdfOptions {
predicate_pd: true, ..
}, ..
} = lp
{
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,11 +802,10 @@ impl ProjectionPushDown {
Udf {
input,
function,
predicate_pd,
projection_pd,
options,
schema,
} => {
if projection_pd {
if options.projection_pd {
self.pushdown_and_assign(
input,
acc_projections,
Expand All @@ -819,8 +818,7 @@ impl ProjectionPushDown {
Ok(Udf {
input,
function,
predicate_pd,
projection_pd,
options,
schema,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl SlicePushDown {
| m @ (Explode {..}, _)
| m @ (Cache {..}, _)
| m @ (Distinct {..}, _)
| m @ (Udf {predicate_pd: false, ..}, _)
| m @ (Udf {options: LogicalPlanUdfOptions{ predicate_pd: false, ..}, ..}, _)
| m @ (HStack {..},_)
=> {
let (lp, state) = m;
Expand All @@ -231,7 +231,7 @@ impl SlicePushDown {
// [Pushdown]
// these nodes will be pushed down.
m @ (Melt { .. },_)
| m @(Udf{predicate_pd: true, ..}, _)
| m @(Udf{options: LogicalPlanUdfOptions{ predicate_pd: true, ..}, .. }, _)

=> {
let (lp, state) = m;
Expand Down

0 comments on commit 971365d

Please sign in to comment.