Skip to content

Commit

Permalink
fix(optimizer): Constraining TopN Optimization in Batch (#7993)
Browse files Browse the repository at this point in the history
Fix #7992 by apply `TopN` on index optimization into `gen_batch_plan` part.

Please explain **IN DETAIL** what the changes are in this PR and why they are needed:

- Summarize your change (**mandatory**)
Fix #7992 by apply `TopN` on index optimization into `gen_batch_plan` part. One planner test is added.

Approved-By: BugenZhao
Approved-By: chenzl25

Co-Authored-By: Clearlove <yifei.c.wei@gmail.com>
Co-Authored-By: Clearlove <52417396+Eurekaaw@users.noreply.github.com>
  • Loading branch information
y-wei and y-wei committed Feb 17, 2023
1 parent e62cf6b commit 0ce9d72
Show file tree
Hide file tree
Showing 19 changed files with 701 additions and 689 deletions.
80 changes: 40 additions & 40 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Expand Up @@ -31,7 +31,7 @@
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v1, agg], pk_columns: [v1] }
└─StreamProject { exprs: [t.v1, (min(t.v2) + (max(t.v3) * count(t.v1))) as $expr70] }
└─StreamProject { exprs: [t.v1, (min(t.v2) + (max(t.v3) * count(t.v1))) as $expr69] }
└─StreamHashAgg { group_key: [t.v1], aggs: [count, min(t.v2), max(t.v3), count(t.v1)] }
└─StreamExchange { dist: HashShard(t.v1) }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Expand All @@ -51,11 +51,11 @@
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [agg], pk_columns: [] }
└─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr71] }
└─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr70] }
└─StreamGlobalSimpleAgg { aggs: [count, min(min(t.v1)), max(max(t.v2)), sum0(count(t.v3))] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [$expr69], aggs: [count, min(t.v1), max(t.v2), count(t.v3)] }
└─StreamProject { exprs: [t.v1, t.v2, t.v3, t._row_id, Vnode(t._row_id) as $expr69] }
└─StreamHashAgg { group_key: [$expr68], aggs: [count, min(t.v1), max(t.v2), count(t.v3)] }
└─StreamProject { exprs: [t.v1, t.v2, t.v3, t._row_id, Vnode(t._row_id) as $expr68] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(v1 int, v2 int);
Expand Down Expand Up @@ -83,10 +83,10 @@
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v3, agg], pk_columns: [v3] }
└─StreamProject { exprs: [t.v3, (min(t.v1) * (sum($expr137)::Decimal / count($expr137))) as $expr139] }
└─StreamHashAgg { group_key: [t.v3], aggs: [count, min(t.v1), sum($expr137), count($expr137)] }
└─StreamProject { exprs: [t.v3, (min(t.v1) * (sum($expr135)::Decimal / count($expr135))) as $expr137] }
└─StreamHashAgg { group_key: [t.v3], aggs: [count, min(t.v1), sum($expr135), count($expr135)] }
└─StreamExchange { dist: HashShard(t.v3) }
└─StreamProject { exprs: [t.v3, t.v1, (t.v1 + t.v2) as $expr137, t._row_id] }
└─StreamProject { exprs: [t.v3, t.v1, (t.v1 + t.v2) as $expr135, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: test logical_agg with complex group expression
sql: |
Expand Down Expand Up @@ -151,11 +151,11 @@
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [cnt, sum], pk_columns: [] }
└─StreamProject { exprs: [sum0(count($expr69)), sum(sum($expr69))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum0(count($expr69)), sum(sum($expr69))] }
└─StreamProject { exprs: [sum0(count($expr68)), sum(sum($expr68))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum0(count($expr68)), sum(sum($expr68))] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count, count($expr69), sum($expr69)] }
└─StreamProject { exprs: [(t.v1 + t.v2) as $expr69, t._row_id] }
└─StreamStatelessLocalSimpleAgg { aggs: [count, count($expr68), sum($expr68)] }
└─StreamProject { exprs: [(t.v1 + t.v2) as $expr68, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(v1 int, v2 int, v3 int);
Expand All @@ -169,10 +169,10 @@
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v1, agg], pk_columns: [v1] }
└─StreamProject { exprs: [t.v1, ((sum($expr93) / count($expr93)) + max(t.v1)) as $expr95] }
└─StreamHashAgg { group_key: [t.v1], aggs: [count, sum($expr93), count($expr93), max(t.v1)] }
└─StreamProject { exprs: [t.v1, ((sum($expr91) / count($expr91)) + max(t.v1)) as $expr93] }
└─StreamHashAgg { group_key: [t.v1], aggs: [count, sum($expr91), count($expr91), max(t.v1)] }
└─StreamExchange { dist: HashShard(t.v1) }
└─StreamProject { exprs: [t.v1, (t.v2 + t.v3) as $expr93, t._row_id] }
└─StreamProject { exprs: [t.v1, (t.v2 + t.v3) as $expr91, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 real);
Expand Down Expand Up @@ -410,22 +410,22 @@
└─LogicalProject { exprs: [t.v1, t.v3, t.v2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id] }
optimized_logical_plan: |
LogicalProject { exprs: [(min(t.v1) + (max(t.v3) * count(t.v2))) as $expr20] }
LogicalProject { exprs: [(min(t.v1) + (max(t.v3) * count(t.v2))) as $expr19] }
└─LogicalAgg { aggs: [min(t.v1), max(t.v3), count(t.v2)] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3] }
batch_plan: |
BatchProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr42] }
BatchProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr41] }
└─BatchSimpleAgg { aggs: [min(min(t.v1)), max(max(t.v3)), sum0(count(t.v2))] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [min(t.v1), max(t.v3), count(t.v2)] }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [agg], pk_columns: [] }
└─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr68] }
└─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr66] }
└─StreamGlobalSimpleAgg { aggs: [count, min(min(t.v1)), max(max(t.v3)), sum0(count(t.v2))] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [$expr66], aggs: [count, min(t.v1), max(t.v3), count(t.v2)] }
└─StreamProject { exprs: [t.v1, t.v2, t.v3, t._row_id, Vnode(t._row_id) as $expr66] }
└─StreamHashAgg { group_key: [$expr64], aggs: [count, min(t.v1), max(t.v3), count(t.v2)] }
└─StreamProject { exprs: [t.v1, t.v2, t.v3, t._row_id, Vnode(t._row_id) as $expr64] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: dup group key
sql: |
Expand Down Expand Up @@ -593,8 +593,8 @@
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr1] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
optimized_logical_plan: |
LogicalAgg { aggs: [sum($expr20) filter(((t.a * t.b) > 0:Int32))] }
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr20] }
LogicalAgg { aggs: [sum($expr19) filter(((t.a * t.b) > 0:Int32))] }
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr19] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
- name: complex filter clause
sql: |
Expand All @@ -606,17 +606,17 @@
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr1] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
optimized_logical_plan: |
LogicalAgg { aggs: [max($expr20) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))] }
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr20] }
LogicalAgg { aggs: [max($expr19) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))] }
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr19] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [sab], pk_columns: [] }
└─StreamProject { exprs: [max(max($expr44) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] }
└─StreamGlobalSimpleAgg { aggs: [count, max(max($expr44) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] }
└─StreamProject { exprs: [max(max($expr42) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] }
└─StreamGlobalSimpleAgg { aggs: [count, max(max($expr42) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [$expr45], aggs: [count, max($expr44) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))] }
└─StreamProject { exprs: [t.a, t.b, $expr44, t._row_id, Vnode(t._row_id) as $expr45] }
└─StreamProject { exprs: [t.a, t.b, (t.a * t.b) as $expr44, t._row_id] }
└─StreamHashAgg { group_key: [$expr43], aggs: [count, max($expr42) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))] }
└─StreamProject { exprs: [t.a, t.b, $expr42, t._row_id, Vnode(t._row_id) as $expr43] }
└─StreamProject { exprs: [t.a, t.b, (t.a * t.b) as $expr42, t._row_id] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: avg filter clause + group by
sql: |
Expand All @@ -628,12 +628,12 @@
└─LogicalProject { exprs: [t.b, t.a] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
optimized_logical_plan: |
LogicalProject { exprs: [(sum(t.a) filter((t.a > t.b))::Decimal / count(t.a) filter((t.a > t.b))) as $expr21] }
LogicalProject { exprs: [(sum(t.a) filter((t.a > t.b))::Decimal / count(t.a) filter((t.a > t.b))) as $expr20] }
└─LogicalAgg { group_key: [t.b], aggs: [sum(t.a) filter((t.a > t.b)), count(t.a) filter((t.a > t.b))] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [avga, t.b(hidden)], pk_columns: [t.b] }
└─StreamProject { exprs: [(sum(t.a) filter((t.a > t.b))::Decimal / count(t.a) filter((t.a > t.b))) as $expr47, t.b] }
└─StreamProject { exprs: [(sum(t.a) filter((t.a > t.b))::Decimal / count(t.a) filter((t.a > t.b))) as $expr45, t.b] }
└─StreamHashAgg { group_key: [t.b], aggs: [count, sum(t.a) filter((t.a > t.b)), count(t.a) filter((t.a > t.b))] }
└─StreamExchange { dist: HashShard(t.b) }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Expand Down Expand Up @@ -855,16 +855,16 @@
└─LogicalProject { exprs: [t.b, (Length(t.a) * t.b) as $expr1] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
optimized_logical_plan: |
LogicalAgg { aggs: [sum($expr20) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))] }
└─LogicalProject { exprs: [t.b, (Length(t.a) * t.b) as $expr20] }
LogicalAgg { aggs: [sum($expr19) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))] }
└─LogicalProject { exprs: [t.b, (Length(t.a) * t.b) as $expr19] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
└─StreamProject { exprs: [sum(sum($expr44) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr44) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] }
└─StreamProject { exprs: [sum(sum($expr42) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr42) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count, sum($expr44) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))] }
└─StreamProject { exprs: [t.b, (Length(t.a) * t.b) as $expr44, t._row_id] }
└─StreamStatelessLocalSimpleAgg { aggs: [count, sum($expr42) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))] }
└─StreamProject { exprs: [t.b, (Length(t.a) * t.b) as $expr42, t._row_id] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int, y varchar);
Expand Down Expand Up @@ -1097,9 +1097,9 @@
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [stddev_samp, stddev_pop], pk_columns: [] }
└─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal::Float64, Pow(((sum(sum($expr205))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr208, Pow(((sum(sum($expr205))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr209] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr205)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum($expr205)), sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal::Float64, Pow(((sum(sum($expr202))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr205, Pow(((sum(sum($expr202))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr206] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr202)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum($expr202)), sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count, sum($expr205), sum(t.v1), count(t.v1), sum($expr205), sum(t.v1), count(t.v1)] }
└─StreamProject { exprs: [t.v1, (t.v1 * t.v1) as $expr205, t._row_id] }
└─StreamStatelessLocalSimpleAgg { aggs: [count, sum($expr202), sum(t.v1), count(t.v1), sum($expr202), sum(t.v1), count(t.v1)] }
└─StreamProject { exprs: [t.v1, (t.v1 * t.v1) as $expr202, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }

0 comments on commit 0ce9d72

Please sign in to comment.