Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): index accelerating TopN #7726

Merged
merged 26 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
398d938
mimic logical_scan
y-wei Feb 6, 2023
4bf7616
let topn using index, first trial
y-wei Feb 6, 2023
ff9d519
remove useless code
y-wei Feb 6, 2023
4a9efc6
remove useless code
y-wei Feb 6, 2023
93ef95c
solve the order problem
y-wei Feb 6, 2023
3d6798d
have default any order
y-wei Feb 6, 2023
f9fcc0a
rename and replace with input()
y-wei Feb 7, 2023
baf8e6c
add license
y-wei Feb 7, 2023
495fa8e
using index_table.pk rather than index_items
y-wei Feb 7, 2023
978a480
Merge branch 'topn_acc' of github.com:ClearloveWei/risingwave_e into …
y-wei Feb 7, 2023
616dc8a
update planner test
y-wei Feb 7, 2023
fbd1b3d
add e2e tests
y-wei Feb 7, 2023
3e7679e
seq_scan_node specifies chunk size at executor
y-wei Feb 7, 2023
ff4eaa8
check and dashboard prost
y-wei Feb 7, 2023
5e51615
support primary key
y-wei Feb 8, 2023
8265643
format
y-wei Feb 8, 2023
e277ee8
make chunk_size optional, replace satisfies with supersets
y-wei Feb 8, 2023
37ce43c
apply s2p mapping before comparing index and topn order
y-wei Feb 8, 2023
fc94781
translate index column_idx to topn_col_idx
y-wei Feb 8, 2023
cb87843
workaround for optional in proto
y-wei Feb 8, 2023
8f352b3
check
y-wei Feb 8, 2023
aeb41e4
replace required to output and include it in the primary key part
y-wei Feb 8, 2023
e7cb28d
apply optimization iff predicate is always true
y-wei Feb 9, 2023
f537a4f
Merge branch 'main' into topn_acc
y-wei Feb 9, 2023
cd261db
update planner test
y-wei Feb 9, 2023
260834c
Merge branch 'main' into topn_acc
mergify[bot] Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion dashboard/proto/gen/batch_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions e2e_test/batch/top_n/top_n_on_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t(x int, y int);

statement ok
create index idx on t(x);

statement ok
insert into t values (100, 3), (1, 0), (2, 3), (3, 4), (5, 4);

query II
select * from t order by x limit 1;
----
1 0

query II
select * from t order by x limit 3;
----
1 0
2 3
3 4

statement ok
create table t1(x int primary key, y int);

statement ok
insert into t1 values (100, 3), (1, 0), (2, 3), (3, 4), (5, 4);

query II
select * from t1 order by x limit 1;
----
1 0

query II
select * from t1 order by x limit 3;
----
1 0
2 3
3 4

statement ok
drop table t;

statement ok
drop table t1;
6 changes: 6 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ message RowSeqScanNode {
common.Buffer vnode_bitmap = 4;
// Whether the order on output columns should be preserved.
bool ordered = 5;

message ChunkSize {
uint32 chunk_size = 1;
}
// If along with `batch_limit`, `chunk_size` will be set.
ChunkSize chunk_size = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not optional uint32 chunk_size?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember long ago we forbid optional in proto3 for some reason, but it seems to work now. 😂 Both LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we leave it for a future task that refractors all those optional workarounds 😂

}

message SysRowSeqScanNode {
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
scan_ranges,
ordered: false,
vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
chunk_size: None,
});

Ok(row_seq_scan_node)
Expand Down
10 changes: 8 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,13 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let ordered = seq_scan_node.ordered;

let epoch = source.epoch.clone();
let chunk_size = source.context.get_config().developer.batch_chunk_size;
let chunk_size = if let Some(chunk_size_) = &seq_scan_node.chunk_size {
chunk_size_
.get_chunk_size()
.min(source.context.get_config().developer.batch_chunk_size as u32)
} else {
source.context.get_config().developer.batch_chunk_size as u32
};
let metrics = source.context().task_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
Expand All @@ -262,7 +268,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
scan_ranges,
ordered,
epoch,
chunk_size,
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
)))
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 28,
"plan_node_id": 29,
"plan_node_type": "BatchProject",
"schema": [
{
Expand All @@ -64,7 +64,7 @@
],
"children": [
{
"plan_node_id": 26,
"plan_node_id": 27,
"plan_node_type": "BatchValues",
"schema": [],
"children": [],
Expand Down
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/index_selection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,23 @@
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1)], distribution: UpstreamHashShard(idx1.a) }
- name: topn on index
sql: |
create table t1 (a int, b int);
create index idx1 on t1(a);
select * from t1 order by a limit 1
batch_plan: |
BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [idx1.a ASC], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], distribution: UpstreamHashShard(idx1.a) }
- name: topn on primary key
sql: |
create table t1 (a int primary key, b int);
create index idx1 on t1(a);
select * from t1 order by a limit 1
batch_plan: |
BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [t1.a ASC], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t1, columns: [t1.a, t1.b], distribution: UpstreamHashShard(t1.a) }
12 changes: 6 additions & 6 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
| └─StreamExchange { dist: HashShard(auction, window_start) }
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamShare { id = 744 }
| └─StreamShare { id = 764 }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 7 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
Expand All @@ -362,7 +362,7 @@
└─StreamExchange { dist: HashShard(auction, window_start) }
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamShare { id = 744 }
└─StreamShare { id = 764 }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
Expand Down Expand Up @@ -480,7 +480,7 @@
└─StreamHashJoin { type: Inner, predicate: price = max(price), output: all }
├─StreamExchange { dist: HashShard(price) }
| └─StreamProject { exprs: [auction, bidder, price, date_time, _row_id] }
| └─StreamShare { id = 434 }
| └─StreamShare { id = 446 }
| └─StreamProject { exprs: [auction, bidder, price, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 7 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
Expand All @@ -489,7 +489,7 @@
└─StreamAppendOnlyHashAgg { group_key: [(TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval)], aggs: [count, max(price)] }
└─StreamExchange { dist: HashShard((TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval)) }
└─StreamProject { exprs: [(TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval), price, _row_id] }
└─StreamShare { id = 434 }
└─StreamShare { id = 446 }
└─StreamProject { exprs: [auction, bidder, price, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
Expand Down Expand Up @@ -1331,7 +1331,7 @@
| | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] }
| └─StreamExchange { dist: HashShard(auction) }
| └─StreamProject { exprs: [auction, _row_id] }
| └─StreamShare { id = 553 }
| └─StreamShare { id = 571 }
| └─StreamProject { exprs: [auction, _row_id] }
| └─StreamRowIdGen { row_id_index: 7 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
Expand All @@ -1343,7 +1343,7 @@
└─StreamAppendOnlyHashAgg { group_key: [auction], aggs: [count, count] }
└─StreamExchange { dist: HashShard(auction) }
└─StreamProject { exprs: [auction, _row_id] }
└─StreamShare { id = 553 }
└─StreamShare { id = 571 }
└─StreamProject { exprs: [auction, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
| └─StreamProject { exprs: [id, _row_id] }
| └─StreamFilter { predicate: (initial_bid = 1:Int32) }
| └─StreamProject { exprs: [id, initial_bid, _row_id] }
| └─StreamShare { id = 519 }
| └─StreamShare { id = 539 }
| └─StreamProject { exprs: [id, initial_bid, _row_id] }
| └─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
| └─StreamRowIdGen { row_id_index: 9 }
Expand All @@ -57,7 +57,7 @@
└─StreamProject { exprs: [id, _row_id] }
└─StreamFilter { predicate: (initial_bid = 2:Int32) }
└─StreamProject { exprs: [id, initial_bid, _row_id] }
└─StreamShare { id = 519 }
└─StreamShare { id = 539 }
└─StreamProject { exprs: [id, initial_bid, _row_id] }
└─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
└─StreamRowIdGen { row_id_index: 9 }
Expand Down Expand Up @@ -125,7 +125,7 @@
| └─StreamExchange { dist: HashShard(auction, window_start) }
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamShare { id = 744 }
| └─StreamShare { id = 764 }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 4 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
Expand All @@ -137,7 +137,7 @@
└─StreamExchange { dist: HashShard(auction, window_start) }
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamShare { id = 744 }
└─StreamShare { id = 764 }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/shared_views.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
└─StreamHashJoin { type: Inner, predicate: (t1.x + t1.y) = (t1.x * (t1.x + t1.y)), output: [(t1.x + t1.y), (t1.x * (t1.x + t1.y)), (t1.y * (t1.x + t1.y)), t1._row_id, t1._row_id, t1._row_id, t1.x, (t1.x + t1.y)] }
├─StreamExchange { dist: HashShard((t1.x + t1.y)) }
| └─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
| └─StreamShare { id = 207 }
| └─StreamShare { id = 212 }
| └─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
| └─StreamFilter { predicate: (t1.y > 0:Int32) }
| └─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Expand All @@ -37,7 +37,7 @@
| └─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard((t1.x + t1.y)) }
└─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
└─StreamShare { id = 207 }
└─StreamShare { id = 212 }
└─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
└─StreamFilter { predicate: (t1.y > 0:Int32) }
└─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/tpch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2573,7 +2573,7 @@
| ├─StreamExchange { dist: HashShard(supplier.s_suppkey) }
| | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) }
| └─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
| └─StreamShare { id = 900 }
| └─StreamShare { id = 921 }
| └─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
| └─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [count, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
| └─StreamExchange { dist: HashShard(lineitem.l_suppkey) }
Expand All @@ -2587,7 +2587,7 @@
└─StreamHashAgg { group_key: [Vnode(lineitem.l_suppkey)], aggs: [count, max(sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount))))] }
└─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount))), Vnode(lineitem.l_suppkey)] }
└─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
└─StreamShare { id = 900 }
└─StreamShare { id = 921 }
└─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
└─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [count, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
└─StreamExchange { dist: HashShard(lineitem.l_suppkey) }
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,13 @@ impl PlanRoot {
ApplyOrder::TopDown,
);

plan = self.optimize_by_rules(
plan,
"Agg on Index".to_string(),
vec![TopNOnIndexRule::create()],
ApplyOrder::TopDown,
);

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,15 @@ impl BatchLimit {
let new_offset = 0;
let logical_partial_limit = LogicalLimit::new(input, new_limit, new_offset);
let batch_partial_limit = Self::new(logical_partial_limit);
let ensure_single_dist = RequiredDist::single()
.enforce_if_not_satisfies(batch_partial_limit.into(), &Order::any())?;
let any_order = Order::any();
let ensure_single_dist = RequiredDist::single().enforce_if_not_satisfies(
batch_partial_limit.into(),
if self.order().field_order.is_empty() {
&any_order
} else {
self.order()
},
)?;
let batch_global_limit = self.clone_with_input(ensure_single_dist);
Ok(batch_global_limit.into())
}
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::error::Result;
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode};
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;

Expand Down Expand Up @@ -240,6 +241,10 @@ impl ToBatchProst for BatchSeqScan {
// To be filled by the scheduler.
vnode_bitmap: None,
ordered: !self.order().is_any(),
chunk_size: self
.logical
.chunk_size()
.map(|chunk_size| ChunkSize { chunk_size }),
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct Scan {
pub indexes: Vec<Rc<IndexCatalog>>,
/// The pushed down predicates. It refers to column indexes of the table.
pub predicate: Condition,
/// Help RowSeqScan executor use a better chunk size
pub chunk_size: Option<u32>,
}

impl Scan {
Expand Down
Loading