Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): index accelerating TopN #7726

Merged
merged 26 commits into from
Feb 9, 2023
Merged

Conversation

y-wei
Copy link
Contributor

@y-wei y-wei commented Feb 6, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Optimizing applicable LogicalTopN-LogicalScan to LogicalLimit-LogicalScan by a new rule named top_n_on_index that arbitrarily match the above structure.

Please explain IN DETAIL what the changes are in this PR and why they are needed:

  • Summarize your change (mandatory)
    LogicalTopN will be replaced with LogicalLimit if there is applicable index or primary key.

On index:

dev=> explain select v1 from t order by v1 limit 1;
                       QUERY PLAN                       
--------------------------------------------------------
 BatchLimit { limit: 1, offset: 0 }
 └─BatchExchange { order: [idx1.v1 ASC], dist: Single }
   └─BatchLimit { limit: 1, offset: 0 }
     └─BatchScan { table: idx1, columns: [v1] }
(4 rows)

On primary key:

dev=> explain select * from t order by k limit 1;
                     QUERY PLAN                     
----------------------------------------------------
 BatchLimit { limit: 1, offset: 0 }
 └─BatchExchange { order: [t.k ASC], dist: Single }
   └─BatchLimit { limit: 1, offset: 0 }
     └─BatchScan { table: t, columns: [k, cnt] }
(4 rows)

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
    - [ ] I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features).
  • I have demonstrated that backward compatibility is not broken by breaking changes and created issues to track deprecated features to be removed in the future. (Please refer the issue)
  • All checks passed in ./risedev check (or alias, ./risedev c)

Refer to a related PR or issue link (optional)

#7656

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
@y-wei y-wei changed the title feat (optimizer): index accelerating TopN feat(optimizer): index accelerating TopN Feb 6, 2023
@y-wei
Copy link
Contributor Author

y-wei commented Feb 6, 2023

Actually the query result does not make sense currently 🥲. Is it caused by a missing MergeSort as mentioned in #7656 ? Seems solved

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
@y-wei
Copy link
Contributor Author

y-wei commented Feb 6, 2023

Some planner tests which seem quite unrelated fail 🤨

@y-wei y-wei marked this pull request as ready for review February 6, 2023 23:13
Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a planner test and a e2e test? Rest LGTM!

Comment on lines 125 to 129
// For Optimizing TopN
pub fn get_child(&self) -> PlanRef {
self.core.input.clone()
}

Copy link
Contributor

@chenzl25 chenzl25 Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just use existing method input().

Comment on lines 9 to 12
impl Rule for AggOnIndexRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let logical_topn: &LogicalTopN = plan.as_logical_top_n()?;
let logical_scan: LogicalScan = logical_topn.get_child().as_logical_scan()?.to_owned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule name is inconsistent with its match pattern. Maybe TopNOnIndexRule is more appropriate.

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Co-authored-by: lmatz <lmatz823@gmail.com>
Comment on lines 39 to 42
.map(|idx_item| FieldOrder {
index: idx_item.index,
direct: Asc,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we've supported it, but we can specify DESC order when creating index in postgres (which is often used along with NULLS FIRST/LAST). So maybe we should not hard-code an Asc here, but use index_table.pk instead.

cc @chenzl25

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Use index_table.pk instead of hard-code Asc even if we do not support creating indexes with descending ordering currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since index_items and index_table.pk.items() are not quite matched, I'm using index_table.pk.items() instead and it seems won't affect the result. Actually this part of code is derived from

let index = self.indexes().iter().find(|idx| {
. Shall I also change the logic there?

@BugenZhao
Copy link
Member

Done by a new rule named agg_on_index that arbitrarily match the above structure.

Please also update the PR description as it will be used as the commit message.

Are there any handlers/executors that I need to take care of?

The batch scan will output at least one chunk to downstream, so we still have to scan 1024(default chunk size) * parallelism records. To achieve a better effect, we may also need to tell the scan to reduce the chunk size. 🤔

@BugenZhao
Copy link
Member

  • Primary key is not supported.

Are we going to support it in next PRs?

@y-wei
Copy link
Contributor Author

y-wei commented Feb 7, 2023

The batch scan will output at least one chunk to downstream, so we still have to scan 1024(default chunk size) * parallelism records. To achieve a better effect, we may also need to tell the scan to reduce the chunk size. 🤔

Are you talking about

? Actually I'm interested in how point_get works. Can we leverage this?
let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
@BugenZhao
Copy link
Member

Are you talking about

? Actually I'm interested in how point_get works. Can we leverage this?

let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges

Exactly. I guess this is not related to point_get, which uses StateStore::get instead of scan under the hood. Also, we can make this more general: for queries with ORDER BY .. LIMIT 10, we can also set the chunk size to 10 since it's much smaller than 1024.

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
@codecov
Copy link

codecov bot commented Feb 8, 2023

Codecov Report

Merging #7726 (8265643) into main (dffc2f1) will increase coverage by 0.00%.
The diff coverage is 89.91%.

❗ Current head 8265643 differs from pull request most recent head cb87843. Consider uploading reports for the commit cb87843 to get more accurate results

@@           Coverage Diff            @@
##             main    #7726    +/-   ##
========================================
  Coverage   71.70%   71.71%            
========================================
  Files        1096     1097     +1     
  Lines      174608   174724   +116     
========================================
+ Hits       125208   125303    +95     
- Misses      49400    49421    +21     
Flag Coverage Δ
rust 71.71% <89.91%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/batch/src/executor/join/local_lookup_join.rs 54.51% <0.00%> (-0.10%) ⬇️
src/batch/src/executor/row_seq_scan.rs 21.90% <0.00%> (-0.54%) ⬇️
...c/frontend/src/optimizer/plan_node/generic/scan.rs 88.00% <ø> (ø)
src/frontend/src/optimizer/rule/mod.rs 100.00% <ø> (ø)
...rc/frontend/src/optimizer/plan_node/batch_limit.rs 80.00% <85.71%> (ø)
...frontend/src/optimizer/rule/top_n_on_index_rule.rs 95.40% <95.40%> (ø)
src/frontend/src/optimizer/mod.rs 95.24% <100.00%> (+0.05%) ⬆️
...frontend/src/optimizer/plan_node/batch_seq_scan.rs 87.89% <100.00%> (+0.06%) ⬆️
...c/frontend/src/optimizer/plan_node/logical_scan.rs 94.44% <100.00%> (+0.10%) ⬆️
src/source/src/row_id.rs 90.90% <0.00%> (-1.14%) ⬇️
... and 5 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@@ -86,6 +86,7 @@ impl LogicalScan {
table_desc,
indexes,
predicate,
chunk_size: 1024,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using an Option here? If it's None, then the batch executor will follow the value from the config, so that we don't need to hard code a 1024 here.

TIPS: wrap a primitive into a new message will make it optional in proto3 like this:

message Parallelism {
uint64 parallelism = 1;
}

src/frontend/src/optimizer/plan_node/logical_scan.rs Outdated Show resolved Hide resolved
Comment on lines +111 to +115
direct: if op.order_type == OrderType::Ascending {
Direction::Asc
} else {
Direction::Desc
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 I guess @richardchien is working on unifying these types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:doge:

BTW there's no conflict for now. I'll unify these types after finishing distinct aggregator, which may need some days.

Comment on lines 56 to 68
let index = logical_scan.indexes().iter().find(|idx| {
Order {
field_order: idx
.index_table
.pk()
.iter()
.map(|idx_item| FieldOrder {
index: idx_item.index,
direct: idx_item.direct,
})
.collect(),
}
.satisfies(order)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't compare index order directly with the topn order, because they refer to different columns. We need a map between them. The following case can reproduce this bug.

create table t (a int primary key, b int); 
create index idx on t(b);
explain  select * from t order by b limit 1;

Copy link
Contributor Author

@y-wei y-wei Feb 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then should we test if the index order is a superset of the topn order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need it and actually satisfies method has already handled the superset problem, but considering the above case, my point is we need to compare index order with topn order at the same level, that is, both refer to primary table column or index column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, I'll try to solve it in the next commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems solved 😁

dev=> create table t (v1 int, v2 int, v3 int);
CREATE_TABLE
dev=> create index idx on t (v2) include (v2, v3);
CREATE_INDEX
dev=> explain select v2, v3 from t order by v2, v3 limit 1;
                              QUERY PLAN                              
----------------------------------------------------------------------
 BatchTopN { order: "[t.v2 ASC, t.v3 ASC]", limit: 1, offset: 0 }
 └─BatchExchange { order: [], dist: Single }
   └─BatchTopN { order: "[t.v2 ASC, t.v3 ASC]", limit: 1, offset: 0 }
     └─BatchScan { table: t, columns: [v2, v3] }
(4 rows)

dev=> explain select v2, v3 from t order by v2 limit 1;
                      QUERY PLAN                       
-------------------------------------------------------
 BatchLimit { limit: 1, offset: 0 }
 └─BatchExchange { order: [idx.v2 ASC], dist: Single }
   └─BatchLimit { limit: 1, offset: 0 }
     └─BatchScan { table: idx, columns: [v2, v3] }
(4 rows)

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
@y-wei
Copy link
Contributor Author

y-wei commented Feb 8, 2023

Have no idea of the misc-check


Creating buildkite018631e5b10349f0b1e7fa248ec0e85f_rw-build-env_run ... done
--
  | Check protobuf code format && Lint protobuf | 15s
  | batch_plan.proto.orig 2023-02-08 16:40:37.650298442 +0000 | 0s
  | batch_plan.proto 2023-02-08 16:40:37.650298442 +0000 | 1s
  | @@ -25,9 +25,9 @@
  | // Whether the order on output columns should be preserved.
  | bool ordered = 5;
  |  
  | -  message ChunkSize {
  | -    uint32 chunk_size = 1;
  | -  }
  | +  message ChunkSize {
  | +    uint32 chunk_size = 1;
  | +  }
  | // If along with `batch_limit`, `chunk_size` will be set.
  | ChunkSize chunk_size = 6;
  | }
  | ERROR: 100


Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
@chenzl25
Copy link
Contributor

chenzl25 commented Feb 9, 2023

One more case needs to be handled. When a table scan has a filter which could be a primary lookup followed by a topn. It seems we should ensure we still choose the primary table scan instead of the index.

create table t (a int primary key, b int);
create index idx on t(b);
explain select * from t where a = 1 order by b limit 1;

@y-wei
Copy link
Contributor Author

y-wei commented Feb 9, 2023

One more case needs to be handled. When a table scan has a filter which could be a primary lookup followed by a topn. It seems we should ensure we still choose the primary table scan instead of the index.

Could you explain a bit about the reason of doing primary lookup?

@@ -24,6 +24,12 @@ message RowSeqScanNode {
common.Buffer vnode_bitmap = 4;
// Whether the order on output columns should be preserved.
bool ordered = 5;

message ChunkSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trailing whitespace here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@chenzl25
Copy link
Contributor

chenzl25 commented Feb 9, 2023

One more case needs to be handled. When a table scan has a filter which could be a primary lookup followed by a topn. It seems we should ensure we still choose the primary table scan instead of the index.

create table t (a int primary key, b int);
create index idx on t(b);
explain select * from t where a = 1 order by b limit 1;

Because primary point lookup is more efficient than index range scan in this case. Maybe we can only apply this rule when there are no predicates in the table scan. Actually this decision should be done by a cost based optimizer, but we don't have now, so let's optimize those cases can be improved definitely.

Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Great work, thank you.

uint32 chunk_size = 1;
}
// If along with `batch_limit`, `chunk_size` will be set.
ChunkSize chunk_size = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not optional uint32 chunk_size?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember long ago we forbid optional in proto3 for some reason, but it seems to work now. 😂 Both LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we leave it for a future task that refractors all those optional workarounds 😂

@y-wei y-wei added the mergify/can-merge Indicates that the PR can be added to the merge queue label Feb 9, 2023
y-wei and others added 2 commits February 9, 2023 10:23
Signed-off-by: Eurekaaw <yifei.c.wei@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/optimizer Query optimization. mergify/can-merge Indicates that the PR can be added to the merge queue type/perf
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants