Skip to content

Commit

Permalink
Update plan traversal API.
Browse files Browse the repository at this point in the history
  • Loading branch information
wsx-ucb committed Feb 21, 2023
1 parent e72117d commit 0f2a168
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x] }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
| └─StreamShare { id = 400 }
| └─StreamShare { id = 398 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamShare { id = 400 }
└─StreamShare { id = 398 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
| └─StreamAppendOnlyHashAgg { group_key: [window_start, bid.auction], aggs: [count, count] }
| └─StreamExchange { dist: HashShard(bid.auction, window_start) }
| └─StreamProject { exprs: [bid.auction, window_start, bid._row_id] }
| └─StreamShare { id = 764 }
| └─StreamShare { id = 762 }
| └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
| └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamProject { exprs: [max(count), window_start] }
Expand All @@ -307,7 +307,7 @@
└─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(bid.auction, window_start) }
└─StreamProject { exprs: [bid.auction, window_start, bid._row_id] }
└─StreamShare { id = 764 }
└─StreamShare { id = 762 }
└─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@
| └─StreamAppendOnlyHashAgg { group_key: [window_start, auction], aggs: [count, count] }
| └─StreamExchange { dist: HashShard(auction, window_start) }
| └─StreamProject { exprs: [auction, window_start, _row_id] }
| └─StreamShare { id = 906 }
| └─StreamShare { id = 904 }
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 7 }
Expand All @@ -361,7 +361,7 @@
└─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(auction, window_start) }
└─StreamProject { exprs: [auction, window_start, _row_id] }
└─StreamShare { id = 906 }
└─StreamShare { id = 904 }
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/project_set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@
└─StreamHashJoin { type: Inner, predicate: Unnest($0) = Unnest($0), output: [Unnest($0), t._row_id, projected_row_id, t._row_id, projected_row_id, Unnest($0)] }
├─StreamExchange { dist: HashShard(Unnest($0)) }
| └─StreamProject { exprs: [Unnest($0), t._row_id, projected_row_id] }
| └─StreamShare { id = 357 }
| └─StreamShare { id = 355 }
| └─StreamProject { exprs: [Unnest($0), t._row_id, projected_row_id] }
| └─StreamProjectSet { select_list: [Unnest($0), $1] }
| └─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(Unnest($0)) }
└─StreamProject { exprs: [Unnest($0), t._row_id, projected_row_id] }
└─StreamShare { id = 357 }
└─StreamShare { id = 355 }
└─StreamProject { exprs: [Unnest($0), t._row_id, projected_row_id] }
└─StreamProjectSet { select_list: [Unnest($0), $1] }
└─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
| └─StreamAppendOnlyHashAgg { group_key: [window_start, auction], aggs: [count, count] }
| └─StreamExchange { dist: HashShard(auction, window_start) }
| └─StreamProject { exprs: [auction, window_start, _row_id] }
| └─StreamShare { id = 906 }
| └─StreamShare { id = 904 }
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 7 }
Expand All @@ -132,7 +132,7 @@
└─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(auction, window_start) }
└─StreamProject { exprs: [auction, window_start, _row_id] }
└─StreamShare { id = 906 }
└─StreamShare { id = 904 }
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
Expand Down Expand Up @@ -165,15 +165,15 @@
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(0:Int32) }
| └─StreamProject { exprs: [sum0(count), 0:Int32] }
| └─StreamShare { id = 255 }
| └─StreamShare { id = 254 }
| └─StreamProject { exprs: [sum0(count)] }
| └─StreamGlobalSimpleAgg { aggs: [count, sum0(count)] }
| └─StreamExchange { dist: Single }
| └─StreamStatelessLocalSimpleAgg { aggs: [count, count] }
| └─StreamTableScan { table: t, columns: [t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(1:Int32) }
└─StreamProject { exprs: [sum0(count), 1:Int32] }
└─StreamShare { id = 255 }
└─StreamShare { id = 254 }
└─StreamProject { exprs: [sum0(count)] }
└─StreamGlobalSimpleAgg { aggs: [count, sum0(count)] }
└─StreamExchange { dist: Single }
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/planner_test/tests/testdata/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,16 @@
└─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(a.a1, a.a1), output: all }
├─LogicalAgg { group_key: [a.a1], aggs: [] }
| └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, a.a1), output: [a.a1] }
| ├─LogicalShare { id = 385 }
| ├─LogicalShare { id = 383 }
| | └─LogicalAgg { group_key: [a.a1], aggs: [] }
| | └─LogicalScan { table: a, columns: [a.a1] }
| └─LogicalJoin { type: Inner, on: true, output: all }
| ├─LogicalShare { id = 385 }
| ├─LogicalShare { id = 383 }
| | └─LogicalAgg { group_key: [a.a1], aggs: [] }
| | └─LogicalScan { table: a, columns: [a.a1] }
| └─LogicalScan { table: b, columns: [] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalShare { id = 385 }
├─LogicalShare { id = 383 }
| └─LogicalAgg { group_key: [a.a1], aggs: [] }
| └─LogicalScan { table: a, columns: [a.a1] }
└─LogicalScan { table: c, columns: [] }
Expand Down Expand Up @@ -326,14 +326,14 @@
├─LogicalScan { table: a, columns: [a.a1, a.a2] }
└─LogicalAgg { group_key: [a.a1], aggs: [min(b.b1)] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, a.a1), output: [a.a1, b.b1] }
├─LogicalShare { id = 497 }
├─LogicalShare { id = 496 }
| └─LogicalAgg { group_key: [a.a1], aggs: [] }
| └─LogicalScan { table: a, columns: [a.a1] }
└─LogicalJoin { type: Inner, on: (b.b2 = min(b.b1)), output: [a.a1, b.b1] }
├─LogicalScan { table: b, columns: [b.b1, b.b2] }
└─LogicalAgg { group_key: [a.a1], aggs: [min(b.b1)] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, b.b1), output: [a.a1, b.b1] }
├─LogicalShare { id = 497 }
├─LogicalShare { id = 496 }
| └─LogicalAgg { group_key: [a.a1], aggs: [] }
| └─LogicalScan { table: a, columns: [a.a1] }
└─LogicalProject { exprs: [b.b1, b.b1] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,16 @@
└─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(a.a1, a.a1), output: all }
├─LogicalAgg { group_key: [a.a1], aggs: [] }
| └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, a.a1), output: [a.a1] }
| ├─LogicalShare { id = 378 }
| ├─LogicalShare { id = 376 }
| | └─LogicalAgg { group_key: [a.a1], aggs: [] }
| | └─LogicalScan { table: a, columns: [a.a1] }
| └─LogicalJoin { type: Inner, on: true, output: all }
| ├─LogicalShare { id = 378 }
| ├─LogicalShare { id = 376 }
| | └─LogicalAgg { group_key: [a.a1], aggs: [] }
| | └─LogicalScan { table: a, columns: [a.a1] }
| └─LogicalScan { table: b, columns: [] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalShare { id = 378 }
├─LogicalShare { id = 376 }
| └─LogicalAgg { group_key: [a.a1], aggs: [] }
| └─LogicalScan { table: a, columns: [a.a1] }
└─LogicalScan { table: c, columns: [] }
Expand Down Expand Up @@ -469,7 +469,7 @@
optimized_logical_plan: |
LogicalAgg { aggs: [count] }
└─LogicalJoin { type: Inner, on: IsNotDistinctFrom(a.a3, a.a3) AND IsNotDistinctFrom(b.b2, b.b2), output: [] }
├─LogicalShare { id = 397 }
├─LogicalShare { id = 396 }
| └─LogicalJoin { type: Inner, on: (a.a3 = b.b2), output: all }
| ├─LogicalScan { table: a, columns: [a.a3] }
| └─LogicalScan { table: b, columns: [b.b2] }
Expand All @@ -478,7 +478,7 @@
└─LogicalAgg { group_key: [a.a3, b.b2], aggs: [count(1:Int32)] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a3, c.c3) AND IsNotDistinctFrom(b.b2, c.c2), output: [a.a3, b.b2, 1:Int32] }
├─LogicalAgg { group_key: [a.a3, b.b2], aggs: [] }
| └─LogicalShare { id = 397 }
| └─LogicalShare { id = 396 }
| └─LogicalJoin { type: Inner, on: (a.a3 = b.b2), output: all }
| ├─LogicalScan { table: a, columns: [a.a3] }
| └─LogicalScan { table: b, columns: [b.b2] }
Expand All @@ -495,12 +495,12 @@
| └─LogicalScan { table: a, columns: [a.x, a.y, a.z], predicate: (a.x = 3:Int32) }
└─LogicalAgg { group_key: [a.x, a.z], aggs: [count(1:Int32)] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.x, a.x) AND IsNotDistinctFrom(a.z, a.z), output: [a.x, a.z, 1:Int32] }
├─LogicalShare { id = 361 }
├─LogicalShare { id = 360 }
| └─LogicalAgg { group_key: [a.x, a.z], aggs: [] }
| └─LogicalScan { table: a, columns: [a.x, a.z], predicate: (a.x = 3:Int32) }
└─LogicalProject { exprs: [a.x, a.z, 1:Int32] }
└─LogicalJoin { type: Inner, on: (b.z = a.z), output: [a.x, a.z] }
├─LogicalShare { id = 361 }
├─LogicalShare { id = 360 }
| └─LogicalAgg { group_key: [a.x, a.z], aggs: [] }
| └─LogicalScan { table: a, columns: [a.x, a.z], predicate: (a.x = 3:Int32) }
└─LogicalScan { table: b, columns: [b.z] }
Expand Down Expand Up @@ -650,12 +650,12 @@
├─LogicalScan { table: t1, columns: [t1.x, t1.y] }
└─LogicalJoin { type: FullOuter, on: (t2.x = t3.x) AND (t1.y = t3.y) AND IsNotDistinctFrom(t1.y, t1.y) AND (t1.y = t2.y), output: [t1.y] }
├─LogicalJoin { type: Inner, on: true, output: all }
| ├─LogicalShare { id = 226 }
| ├─LogicalShare { id = 225 }
| | └─LogicalAgg { group_key: [t1.y], aggs: [] }
| | └─LogicalScan { table: t1, columns: [t1.y] }
| └─LogicalScan { table: t2, columns: [t2.x, t2.y] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalShare { id = 226 }
├─LogicalShare { id = 225 }
| └─LogicalAgg { group_key: [t1.y], aggs: [] }
| └─LogicalScan { table: t1, columns: [t1.y] }
└─LogicalScan { table: t3, columns: [t3.x, t3.y] }
Expand Down Expand Up @@ -739,12 +739,12 @@
├─LogicalScan { table: strings, columns: [strings.v1] }
└─LogicalAgg { group_key: [strings.v1], aggs: [string_agg(strings.v1, ',':Varchar order_by(strings.v1 ASC NULLS LAST))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(strings.v1, strings.v1), output: [strings.v1, strings.v1, ',':Varchar, strings.v1] }
├─LogicalShare { id = 284 }
├─LogicalShare { id = 283 }
| └─LogicalAgg { group_key: [strings.v1], aggs: [] }
| └─LogicalScan { table: strings, columns: [strings.v1] }
└─LogicalProject { exprs: [strings.v1, strings.v1, ',':Varchar, strings.v1] }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalShare { id = 284 }
├─LogicalShare { id = 283 }
| └─LogicalAgg { group_key: [strings.v1], aggs: [] }
| └─LogicalScan { table: strings, columns: [strings.v1] }
└─LogicalScan { table: strings, columns: [strings.v1] }
Expand Down
31 changes: 19 additions & 12 deletions src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::collections::HashMap;
use std::hash::Hash;

use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan};
use crate::utils::{Endo, Layer, Visit};
use crate::Explain;
use crate::utils::{Endo, Visit};

pub trait Semantics<V: Hash + Eq> {
fn semantics(&self) -> V;
Expand Down Expand Up @@ -57,7 +56,7 @@ where
fn apply(&mut self, t: PlanRef) -> PlanRef {
let semantics = t.semantics();
let share = self.cache.get(&semantics).cloned().unwrap_or_else(|| {
let share = LogicalShare::new(t.map(|i| self.apply(i)));
let share = LogicalShare::new(self.tree_apply(t));
self.cache.entry(semantics).or_insert(share).clone()
});
share.into()
Expand Down Expand Up @@ -87,8 +86,13 @@ impl Counter {
}

impl VisitPlan for Counter {
fn visited(&self, t: &PlanRef) -> bool {
self.counts.get(&t.id()).is_some_and(|c| *c > 1)
fn visited<F>(&mut self, plan: &PlanRef, mut f: F)
where
F: FnMut(&mut Self),
{
if !self.counts.get(&plan.id()).is_some_and(|c| *c > 1) {
f(self);
}
}
}

Expand All @@ -106,12 +110,18 @@ impl Visit<PlanRef> for Counter {

struct Pruner<'a> {
counts: &'a HashMap<PlanNodeId, u64>,
cache: HashMap<PlanRef, PlanRef>,
cache: HashMap<PlanNodeId, PlanRef>,
}

impl EndoPlan for Pruner<'_> {
fn cache(&mut self) -> &mut HashMap<PlanRef, PlanRef> {
&mut self.cache
fn cached<F>(&mut self, plan: PlanRef, mut f: F) -> PlanRef
where
F: FnMut(&mut Self) -> PlanRef,
{
self.cache.get(&plan.id()).cloned().unwrap_or_else(|| {
let res = f(self);
self.cache.entry(plan.id()).or_insert(res).clone()
})
}
}

Expand All @@ -129,9 +139,6 @@ impl Endo<PlanRef> for Pruner<'_> {
}

fn apply(&mut self, t: PlanRef) -> PlanRef {
// println!("Before:\n{}", t.explain_to_string().unwrap());
let result = self.dag_apply(t);
// println!("After:\n{}", result.explain_to_string().unwrap());
result
self.dag_apply(t)
}
}

0 comments on commit 0f2a168

Please sign in to comment.