Skip to content

Commit

Permalink
Make OptimizerConfig a trait (apache#4631) (apache#4638)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 15, 2022
1 parent 5c558e9 commit 1f929f0
Show file tree
Hide file tree
Showing 28 changed files with 384 additions and 439 deletions.
4 changes: 2 additions & 2 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ impl OptimizerRule for MyRule {
fn optimize(
&self,
plan: &LogicalPlan,
_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
// recurse down and optimize children first
let plan = utils::optimize_children(self, plan, _config)?;
let plan = utils::optimize_children(self, plan, config)?;

match plan {
LogicalPlan::Filter(filter) => {
Expand Down
58 changes: 27 additions & 31 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::logical_expr::{
CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
SetVariable, TableSource, TableType, UNNAMED_TABLE,
};
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use crate::optimizer::{OptimizerContext, OptimizerRule};
use datafusion_sql::{ResolvedTableReference, TableReference};

use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
Expand Down Expand Up @@ -1557,14 +1557,6 @@ impl SessionState {
.register_catalog(config.default_catalog.clone(), default_catalog);
}

let optimizer_config = OptimizerConfig::new().filter_null_keys(
config
.config_options
.read()
.get_bool(OPT_FILTER_NULL_JOIN_KEYS)
.unwrap_or_default(),
);

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(JoinSelection::new()),
Expand Down Expand Up @@ -1593,7 +1585,7 @@ impl SessionState {

SessionState {
session_id,
optimizer: Optimizer::new(&optimizer_config),
optimizer: Optimizer::new(),
physical_optimizers,
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
Expand Down Expand Up @@ -1741,32 +1733,37 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let mut optimizer_config = OptimizerConfig::new()
.with_skip_failing_rules(
self.config
.config_options
.read()
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
.unwrap_or_default(),
)
.with_max_passes(
self.config
.config_options
.read()
.get_u64(OPT_OPTIMIZER_MAX_PASSES)
.unwrap_or_default() as u8,
)
.with_query_execution_start_time(
self.execution_props.query_execution_start_time,
);
// TODO: Implement OptimizerContext directly on DataFrame (#4631) (#4626)
let config = {
let config_options = self.config.config_options.read();
OptimizerContext::new()
.with_skip_failing_rules(
config_options
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
.unwrap_or_default(),
)
.with_max_passes(
config_options
.get_u64(OPT_OPTIMIZER_MAX_PASSES)
.unwrap_or_default() as u8,
)
.with_query_execution_start_time(
self.execution_props.query_execution_start_time,
)
.filter_null_keys(
config_options
.get_bool(OPT_FILTER_NULL_JOIN_KEYS)
.unwrap_or_default(),
)
};

if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
&mut optimizer_config,
&config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
Expand All @@ -1781,8 +1778,7 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimizer
.optimize(plan, &mut optimizer_config, |_, _| {})
self.optimizer.optimize(plan, &config, |_, _| {})
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
Expand All @@ -307,7 +307,7 @@ impl OptimizerRule for TopKOptimizerRule {
return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
k: *fetch,
input: self.optimize(input.as_ref(), optimizer_config)?,
input: self.optimize(input.as_ref(), config)?,
expr: expr[0].clone(),
}),
}));
Expand All @@ -317,7 +317,7 @@ impl OptimizerRule for TopKOptimizerRule {

// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
optimize_children(self, plan, optimizer_config)
optimize_children(self, plan, config)
}

fn name(&self) -> &str {
Expand Down
64 changes: 29 additions & 35 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

//! Eliminate common sub-expression.

use crate::{utils, OptimizerConfig, OptimizerRule};
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;

use arrow::datatypes::DataType;

use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion_expr::{
col,
Expand All @@ -27,8 +30,8 @@ use datafusion_expr::{
logical_plan::{Aggregate, Filter, LogicalPlan, Projection, Sort, Window},
Expr, ExprSchemable,
};
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;

use crate::{utils, OptimizerConfig, OptimizerRule};

/// A map from expression's identifier to tuple including
/// - the expression itself (cloned)
Expand Down Expand Up @@ -60,7 +63,7 @@ impl CommonSubexprEliminate {
arrays_list: &[&[Vec<(usize, String)>]],
input: &LogicalPlan,
expr_set: &mut ExprSet,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
let mut affected_id = BTreeSet::<Identifier>::new();

Expand All @@ -80,7 +83,7 @@ impl CommonSubexprEliminate {
.collect::<Result<Vec<_>>>()?;

let mut new_input = self
.try_optimize(input, optimizer_config)?
.try_optimize(input, config)?
.unwrap_or_else(|| input.clone());
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
Expand All @@ -94,17 +97,17 @@ impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.try_optimize(plan, config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut expr_set = ExprSet::new();

Expand All @@ -117,13 +120,8 @@ impl OptimizerRule for CommonSubexprEliminate {
let input_schema = Arc::clone(input.schema());
let arrays = to_arrays(expr, input_schema, &mut expr_set)?;

let (mut new_expr, new_input) = self.rewrite_expr(
&[expr],
&[&arrays],
input,
&mut expr_set,
optimizer_config,
)?;
let (mut new_expr, new_input) =
self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?;

Ok(Some(LogicalPlan::Projection(
Projection::try_new_with_schema(
Expand All @@ -150,7 +148,7 @@ impl OptimizerRule for CommonSubexprEliminate {
&[&[id_array]],
filter.input(),
&mut expr_set,
optimizer_config,
config,
)?;

if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
Expand All @@ -177,7 +175,7 @@ impl OptimizerRule for CommonSubexprEliminate {
&[&arrays],
input,
&mut expr_set,
optimizer_config,
config,
)?;

Ok(Some(LogicalPlan::Window(Window {
Expand All @@ -202,7 +200,7 @@ impl OptimizerRule for CommonSubexprEliminate {
&[&group_arrays, &aggr_arrays],
input,
&mut expr_set,
optimizer_config,
config,
)?;
// note the reversed pop order.
let new_aggr_expr = pop_expr(&mut new_expr)?;
Expand All @@ -221,13 +219,8 @@ impl OptimizerRule for CommonSubexprEliminate {
let input_schema = Arc::clone(input.schema());
let arrays = to_arrays(expr, input_schema, &mut expr_set)?;

let (mut new_expr, new_input) = self.rewrite_expr(
&[expr],
&[&arrays],
input,
&mut expr_set,
optimizer_config,
)?;
let (mut new_expr, new_input) =
self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?;

Ok(Some(LogicalPlan::Sort(Sort {
expr: pop_expr(&mut new_expr)?,
Expand Down Expand Up @@ -259,11 +252,7 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
Expand Down Expand Up @@ -582,20 +571,25 @@ fn replace_common_expr(

#[cfg(test)]
mod test {
use super::*;
use crate::test::*;
use std::iter;

use arrow::datatypes::{Field, Schema};

use datafusion_expr::logical_plan::{table_scan, JoinType};
use datafusion_expr::{
avg, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, sum,
Operator,
};
use std::iter;

use crate::optimizer::OptimizerContext;
use crate::test::*;

use super::*;

fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &mut OptimizerConfig::new())
.optimize(plan, &OptimizerContext::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(expected, formatted_plan);
Expand Down Expand Up @@ -839,7 +833,7 @@ mod test {
.build()
.unwrap();
let rule = CommonSubexprEliminate {};
let optimized_plan = rule.optimize(&plan, &mut OptimizerConfig::new()).unwrap();
let optimized_plan = rule.optimize(&plan, &OptimizerContext::new()).unwrap();

let schema = optimized_plan.schema();
let fields_with_datatypes: Vec<_> = schema
Expand Down
20 changes: 8 additions & 12 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl DecorrelateWhereExists {
fn extract_subquery_exprs(
&self,
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate);

Expand All @@ -58,7 +58,7 @@ impl DecorrelateWhereExists {
match it {
Expr::Exists { subquery, negated } => {
let subquery = self
.try_optimize(&subquery.subquery, optimizer_config)?
.try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
Expand All @@ -77,17 +77,17 @@ impl OptimizerRule for DecorrelateWhereExists {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.try_optimize(plan, config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
Expand All @@ -96,11 +96,11 @@ impl OptimizerRule for DecorrelateWhereExists {

// Apply optimizer rule to current input
let optimized_input = self
.try_optimize(filter_input, optimizer_config)?
.try_optimize(filter_input, config)?
.unwrap_or_else(|| filter_input.clone());

let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
self.extract_subquery_exprs(predicate, config)?;
let optimized_plan = LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(optimized_input),
Expand All @@ -124,11 +124,7 @@ impl OptimizerRule for DecorrelateWhereExists {
}
_ => {
// Apply the optimization to all inputs of the plan
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
Expand Down
Loading

0 comments on commit 1f929f0

Please sign in to comment.