Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): Common sub-plan detection. #7865

Merged
merged 28 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
bcb1d43
[WIP]: Refactor for PlanNode comparison and hashing.
wsx-ucb Feb 13, 2023
575d1cd
[WIP]: Implement merging and pruning of `LogicalShare`.
wsx-ucb Feb 20, 2023
bab492c
Update helper traits for AST manipulation.
wsx-ucb Feb 20, 2023
3737d6e
Update test cases.
wsx-ucb Feb 20, 2023
858e2df
Merge with main.
wsx-ucb Feb 20, 2023
698862c
Add license header.
wsx-ucb Feb 20, 2023
e3a119b
Fix typo. XD
wsx-ucb Feb 20, 2023
de09a81
Clippy fix.
wsx-ucb Feb 20, 2023
b300aca
Update src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
chenzl25 Feb 20, 2023
d9cf468
Update src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
chenzl25 Feb 20, 2023
3f6f4fc
add Copyright
chenzl25 Feb 20, 2023
35eae0b
Merge with main.
wsx-ucb Feb 20, 2023
d6319b1
Avoid sharing values operator.
wsx-ucb Feb 21, 2023
e72117d
Update pruning logic, and delay equivalent node merging.
wsx-ucb Feb 21, 2023
578fcc9
Update plan traversal API.
wsx-ucb Feb 21, 2023
78076eb
Merge with main.
wsx-ucb Feb 21, 2023
96ebfdf
Prune if share node just shares a scan or doesn't share any scan or s…
chenzl25 Feb 22, 2023
4f61a65
Merge branch 'main' into wsx/shared-view
chenzl25 Feb 22, 2023
91b7074
refine
chenzl25 Feb 23, 2023
483443d
resolve conflict
chenzl25 Feb 23, 2023
d4656c1
rerun
chenzl25 Feb 23, 2023
a3f413f
rerun
chenzl25 Feb 23, 2023
2f9df2d
rerun
chenzl25 Feb 23, 2023
98e3db1
Merge branch 'main' into wsx/shared-view
chenzl25 Feb 23, 2023
9ea7715
resolve conflict
chenzl25 Feb 23, 2023
f84d877
resolve conflict
chenzl25 Feb 23, 2023
c023fe6
rerun
chenzl25 Feb 23, 2023
ef76ff7
resolve conflict
chenzl25 Feb 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl std::fmt::Display for ColumnId {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDesc {
pub data_type: DataType,
pub column_id: ColumnId,
Expand Down Expand Up @@ -234,7 +234,7 @@ impl From<&ColumnDesc> for ProstColumnDesc {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnCatalog {
pub column_desc: ColumnDesc,
pub is_hidden: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::util::sort_util::OrderPair;
/// Includes necessary information for compute node to access data of the table.
///
/// It's a subset of `TableCatalog` in frontend. Refer to `TableCatalog` for more details.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct TableDesc {
/// Id of the table, to find in storage.
pub table_id: TableId,
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::array::ArrayBuilderImpl;
use crate::types::DataType;

/// The field in the schema of the executor's return data
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Field {
pub data_type: DataType,
pub name: String,
Expand Down Expand Up @@ -112,7 +112,7 @@ macro_rules! schema_unnamed {
}

/// the schema of the executor's return data
#[derive(Clone, Debug, Default, PartialEq)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Schema {
pub fields: Vec<Field>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum_into;

/// See also [`ScanRangeProst`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScanRange {
pub eq_conds: Vec<Datum>,
pub range: (Bound<ScalarImpl>, Bound<ScalarImpl>),
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/sort_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::array::{Array, ArrayImpl, DataChunk};
use crate::error::ErrorCode::InternalError;
use crate::error::Result;

#[derive(PartialEq, Eq, Copy, Clone, Debug)]
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
pub enum OrderType {
Ascending,
Descending,
Expand All @@ -47,7 +47,7 @@ impl OrderType {
/// Column index with an order type (ASC or DESC). Used to represent a sort key (`Vec<OrderPair>`).
///
/// Corresponds to protobuf [`ColumnOrder`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderPair {
pub column_idx: usize,
pub order_type: OrderType,
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
Expand All @@ -22,7 +22,7 @@ use risingwave_pb::stream_plan::SinkDesc as ProstSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
/// Id of the sink. For debug now.
pub id: SinkId,
Expand All @@ -48,7 +48,7 @@ pub struct SinkDesc {
pub distribution_key: Vec<usize>,

/// The properties of the sink.
pub properties: HashMap<String, String>,
pub properties: BTreeMap<String, String>,

// The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
// based on both its own derivation on the append-only attribute and other user-specified
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SinkDesc {
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
}
}
Expand All @@ -94,7 +94,7 @@ impl SinkDesc {
pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(),
stream_key: self.stream_key.iter().map(|idx| *idx as _).collect_vec(),
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl From<SinkId> for u32 {
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
/// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
/// allowed.
Expand Down
14 changes: 1 addition & 13 deletions src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }

Share Source:

LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }

Predicate Push Down:

LogicalProject { exprs: [1:Int32] }
Expand Down Expand Up @@ -55,7 +50,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 30,
"plan_node_id": 29,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down Expand Up @@ -97,13 +92,6 @@
explain_output: |+
Begin:

LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
└─LogicalScan { table: t2, columns: [v2, _row_id] }

Share Source:

LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,20 @@
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x] }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
├─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
| └─StreamShare { id = 398 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamShare { id = 398 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
- name: Use lookup join
sql: |
create table t1 (v1 int, v2 int);
Expand Down Expand Up @@ -499,7 +503,7 @@
└─BatchExchange { order: [], dist: HashShard(b.x) }
└─BatchScan { table: b, columns: [b.x], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [y, z, $expr144(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], pk_columns: [a._row_id, b._row_id, a.x, b.x], order_descs: [$expr144, a._row_id, b._row_id, a.x, b.x] }
StreamMaterialize { columns: [y, z, $expr150(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], pk_columns: [a._row_id, b._row_id, a.x, b.x], order_descs: [$expr150, a._row_id, b._row_id, a.x, b.x] }
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] }
└─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) }
Expand Down