Skip to content

Commit

Permalink
Update helper traits for AST manipulation.
Browse files Browse the repository at this point in the history
  • Loading branch information
wsx-ucb committed Feb 20, 2023
1 parent 575d1cd commit aad841a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 26 deletions.
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ impl PlanRoot {
ApplyOrder::TopDown,
);

plan = plan.merge_eq_nodes();
plan = plan.prune_share();

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

Expand Down
57 changes: 46 additions & 11 deletions src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::hash::Hash;

use super::{LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary};
use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan};
use crate::utils::{Endo, Layer, Visit};

pub trait Semantics<V: Hash + Eq> {
Expand Down Expand Up @@ -53,7 +53,7 @@ impl PlanRef {
pub fn prune_share(&self) -> PlanRef {
let mut counter = Counter::default();
counter.visit(self);
counter.apply(self.clone())
counter.to_pruner().apply(self.clone())
}
}

Expand All @@ -62,22 +62,57 @@ struct Counter {
counts: HashMap<PlanNodeId, u64>,
}

impl Counter {
fn to_pruner<'a>(&'a self) -> Pruner<'a> {
Pruner {
counts: &self.counts,
cache: HashMap::new(),
}
}
}

impl VisitPlan for Counter {
fn visited(&self, t: &PlanRef) -> bool {
self.counts.get(&t.id()).is_some_and(|c| *c > 1)
}
}

impl Visit<PlanRef> for Counter {
fn pre(&mut self, t: &PlanRef) {
t.as_logical_share().map(|s| {
fn visit(&mut self, t: &PlanRef) {
if let Some(s) = t.as_logical_share() {
self.counts
.entry(s.id())
.and_modify(|c| *c += 1)
.or_insert(1)
});
.or_insert(1);
}
self.dag_visit(t);
}
}

impl Endo<PlanRef> for Counter {
struct Pruner<'a> {
counts: &'a HashMap<PlanNodeId, u64>,
cache: HashMap<PlanRef, PlanRef>,
}

impl EndoPlan for Pruner<'_> {
fn cache(&mut self) -> &mut HashMap<PlanRef, PlanRef> {
&mut self.cache
}
}

impl Endo<PlanRef> for Pruner<'_> {
fn pre(&mut self, t: PlanRef) -> PlanRef {
match t.as_logical_share() {
Some(s) if *self.counts.get(&s.id()).unwrap() == 1 => s.input(),
_ => t,
}
let prunable = |s: &LogicalShare| {
*self.counts.get(&s.id()).expect("Unprocessed shared node.") == 1
|| s.input().as_logical_scan().is_some()
};
t.as_logical_share()
.cloned()
.filter(prunable)
.map_or(t, |s| s.input())
}

fn apply(&mut self, t: PlanRef) -> PlanRef {
self.dag_apply(t)
}
}
45 changes: 44 additions & 1 deletion src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! - all field should be valued in construction, so the properties' derivation should be finished
//! in the `new()` function.

use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::ops::Deref;
Expand Down Expand Up @@ -150,6 +151,48 @@ impl Layer for PlanRef {
#[derive(Clone, Debug, Copy, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub struct PlanNodeId(pub i32);

/// A more sophisticated `Endo` taking into account of the DAG structure of `PlanRef`.
/// In addition to `Endo`, one have to specify the `cache` hash map
/// to store transformed `LogicalShare` and their results,
/// and the `dag_apply` function will take care to only transform every `LogicalShare` nodes once.
///
/// Note: Due to the way super trait is designed in rust,
/// one need to have seperate implementation blocks of `Endo<PlanRef>` and `EndoPlan`.
/// And conventionally the real transformation `apply` is under `Endo<PlanRef>`,
/// although one can refer to `dag_apply` in the implementation of `apply`.
pub trait EndoPlan: Endo<PlanRef> {
fn cache(&mut self) -> &mut HashMap<PlanRef, PlanRef>;

fn dag_apply(&mut self, plan: PlanRef) -> PlanRef {
match plan.as_logical_share() {
Some(_) => self.cache().get(&plan).cloned().unwrap_or_else(|| {
let res = self.tree_apply(plan.clone());
self.cache().entry(plan).or_insert(res).clone()
}),
None => self.tree_apply(plan),
}
}
}

/// A more sophisticated `Visit` taking into account of the DAG structure of `PlanRef`.
/// In addition to `Visit`, one have to specify `visited` to mark visited `LogicalShare` nodes,
/// and the `dag_visit` function will take care to only visit every `LogicalShare` nodes once.
/// See also `EndoPlan`.
pub trait VisitPlan: Visit<PlanRef> {
fn visited(&self, plan: &PlanRef) -> bool;

fn dag_visit(&mut self, plan: &PlanRef) {
match plan.as_logical_share() {
Some(_) if self.visited(plan) => (),
_ => {
self.pre(plan);
plan.descent(|i| self.visit(i));
self.post(plan);
}
}
}
}

#[derive(Debug, PartialEq)]
pub enum Convention {
Logical,
Expand Down Expand Up @@ -668,7 +711,7 @@ pub use stream_watermark_filter::StreamWatermarkFilter;
use crate::expr::{ExprImpl, ExprRewriter, InputRef, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Layer};
use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};

/// `for_all_plan_nodes` includes all plan nodes. If you added a new plan node
/// inside the project, be sure to add here and in its conventions like `for_logical_plan_nodes`
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ pub struct PlanBase {
pub logical_pk: Vec<usize>,
/// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
/// correctness, but insert unnecessary sort in plan
#[derivative(PartialEq = "ignore")]
#[derivative(Hash = "ignore")]
pub order: Order,
/// The distribution property of the PlanNode's output, store an `Distribution::any()` here
/// will not affect correctness, but insert unnecessary exchange in plan
#[derivative(PartialEq = "ignore")]
#[derivative(Hash = "ignore")]
pub dist: Distribution,
/// The append-only property of the PlanNode's output is a stream-only property. Append-only
/// means the stream contains only insert operation.
Expand Down
48 changes: 38 additions & 10 deletions src/frontend/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,44 @@ impl ExprRewriter for Substitute {
}
}

// Traits for easy manipulation of recursive structures

/// A `Layer` is a container with subcomponents of type `Sub`.
/// We usually use `Layer` to represents one layer of a tree-like structure,
/// where the subcomponents are the recursive subtrees.
/// But in general, the subcomponent can be of different type than the `Layer`.
/// Such structual relation between `Sub` and `Layer`
/// allows us to lift transformation on `Sub` to that on `Layer.`
/// A related and even more general notion is `Functor`,
/// which might also be helpful to define in the future.
pub trait Layer: Sized {
type Sub;

/// Given a transformation `f : Sub -> Sub`,
/// we can derive a transformation on the entire `Layer` by acting `f` on all subcomponents.
fn map<F>(self, f: F) -> Self
where
F: FnMut(Self::Sub) -> Self::Sub;

/// Given a traversal `f : Sub -> ()`,
/// we can derive a traversal on the entire `Layer`
/// by sequentially visiting the subcomponents with `f`.
fn descent<F>(&self, f: F)
where
F: FnMut(&Self::Sub);
}

/// A tree-like structure is a `Layer` where the subcomponents are recursively trees.
pub trait Tree = Layer<Sub = Self>;

/// Given a tree-like structure `T`,
/// we usually can specify a transformation `T -> T`
/// by providing a pre-order transformation `pre : T -> T`
/// and a post-order transformation `post : T -> T`.
/// Specifically, the derived transformation `apply : T -> T` first applies `pre`,
/// then maps itself over the subtrees, and finally applies `post`.
/// This allows us to obtain a global transformation acting recursively on all levels
/// by specifying simplier transformations at acts locally.
pub trait Endo<T: Tree> {
fn pre(&mut self, t: T) -> T {
t
Expand All @@ -71,33 +95,37 @@ pub trait Endo<T: Tree> {
t
}

fn apply(&mut self, t: T) -> T {
/// The real application function is left undefined.
/// If we want the derived transformation
/// we can simply call `tree_apply` in the implementation.
/// But for more complicated requirements,
/// e.g. skipping over certain subtrees, custom logic can be added.
fn apply(&mut self, t: T) -> T;

/// The derived transformation based on `pre` and `post`.
fn tree_apply(&mut self, t: T) -> T {
let t = self.pre(t).map(|s| self.apply(s));
self.post(t)
}
}

/// A similar trait to generate traversal over tree-like structure.
/// See `Endo` for more details.
#[allow(unused_variables)]
pub trait Visit<T: Tree> {
fn pre(&mut self, t: &T) {}

fn post(&mut self, t: &T) {}

fn visit(&mut self, t: &T) {
fn visit(&mut self, t: &T);

fn tree_visit(&mut self, t: &T) {
self.pre(t);
t.descent(|i| self.visit(i));
self.post(t);
}
}

// Traits for easily transform and visit recursive structures

pub trait Fold<R> {
fn fold<F>(&self, f: F) -> R
where
F: FnMut(Vec<R>) -> R;
}

// Workaround object safety rules for Eq and Hash, adopted from
// https://github.com/bevyengine/bevy/blob/f7fbfaf9c72035e98c6b6cec0c7d26ff9f5b1c82/crates/bevy_utils/src/label.rs

Expand Down

0 comments on commit aad841a

Please sign in to comment.