Skip to content

Commit

Permalink
feat(optimizer): Common sub-plan detection. (#7865)
Browse files Browse the repository at this point in the history
Attempt to resolve #7360.

Approved-By: chenzl25
Approved-By: st1page

Co-Authored-By: Shuxian Wang <wsx@berkeley.edu>
Co-Authored-By: Dylan <chenzl25@mail2.sysu.edu.cn>
Co-Authored-By: Dylan Chen <zilin@singularity-data.com>
  • Loading branch information
3 people committed Feb 23, 2023
1 parent b39ae68 commit 13b88a5
Show file tree
Hide file tree
Showing 128 changed files with 1,073 additions and 654 deletions.
4 changes: 2 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl std::fmt::Display for ColumnId {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDesc {
pub data_type: DataType,
pub column_id: ColumnId,
Expand Down Expand Up @@ -234,7 +234,7 @@ impl From<&ColumnDesc> for ProstColumnDesc {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnCatalog {
pub column_desc: ColumnDesc,
pub is_hidden: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::util::sort_util::OrderPair;
/// Includes necessary information for compute node to access data of the table.
///
/// It's a subset of `TableCatalog` in frontend. Refer to `TableCatalog` for more details.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct TableDesc {
/// Id of the table, to find in storage.
pub table_id: TableId,
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::array::ArrayBuilderImpl;
use crate::types::DataType;

/// The field in the schema of the executor's return data
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Field {
pub data_type: DataType,
pub name: String,
Expand Down Expand Up @@ -112,7 +112,7 @@ macro_rules! schema_unnamed {
}

/// the schema of the executor's return data
#[derive(Clone, Debug, Default, PartialEq)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Schema {
pub fields: Vec<Field>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
/// `ColIndexMapping` is a partial mapping from usize to usize.
///
/// It is used in optimizer for transformation of column index.
#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ColIndexMapping {
/// The size of the target space, i.e. target index is in the range `(0..target_size)`.
target_size: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum_into;

/// See also [`ScanRangeProst`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScanRange {
pub eq_conds: Vec<Datum>,
pub range: (Bound<ScalarImpl>, Bound<ScalarImpl>),
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/sort_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::array::{Array, ArrayImpl, DataChunk};
use crate::error::ErrorCode::InternalError;
use crate::error::Result;

#[derive(PartialEq, Eq, Copy, Clone, Debug)]
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
pub enum OrderType {
Ascending,
Descending,
Expand All @@ -47,7 +47,7 @@ impl OrderType {
/// Column index with an order type (ASC or DESC). Used to represent a sort key (`Vec<OrderPair>`).
///
/// Corresponds to protobuf [`ColumnOrder`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderPair {
pub column_idx: usize,
pub order_type: OrderType,
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
Expand All @@ -22,7 +22,7 @@ use risingwave_pb::stream_plan::SinkDesc as ProstSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
/// Id of the sink. For debug now.
pub id: SinkId,
Expand All @@ -48,7 +48,7 @@ pub struct SinkDesc {
pub distribution_key: Vec<usize>,

/// The properties of the sink.
pub properties: HashMap<String, String>,
pub properties: BTreeMap<String, String>,

// The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
// based on both its own derivation on the append-only attribute and other user-specified
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SinkDesc {
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
}
}
Expand All @@ -94,7 +94,7 @@ impl SinkDesc {
pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(),
stream_key: self.stream_key.iter().map(|idx| *idx as _).collect_vec(),
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl From<SinkId> for u32 {
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
/// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
/// allowed.
Expand Down
14 changes: 1 addition & 13 deletions src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Share Source:
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Predicate Push Down:
LogicalProject { exprs: [1:Int32] }
Expand Down Expand Up @@ -63,7 +58,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 34,
"plan_node_id": 33,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down Expand Up @@ -105,13 +100,6 @@
explain_output: |+
Begin:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
└─LogicalScan { table: t2, columns: [v2, _row_id] }
Share Source:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,20 @@
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x] }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
├─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
| └─StreamShare { id = 513 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamShare { id = 513 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
- name: Use lookup join
sql: |
create table t1 (v1 int, v2 int);
Expand Down Expand Up @@ -499,7 +503,7 @@
└─BatchExchange { order: [], dist: HashShard(b.x) }
└─BatchScan { table: b, columns: [b.x], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [y, z, $expr156(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], pk_columns: [a._row_id, b._row_id, a.x, b.x], order_descs: [$expr156, a._row_id, b._row_id, a.x, b.x], pk_conflict: "no check" }
StreamMaterialize { columns: [y, z, $expr159(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], pk_columns: [a._row_id, b._row_id, a.x, b.x], order_descs: [$expr159, a._row_id, b._row_id, a.x, b.x], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] }
└─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) }
Expand Down

0 comments on commit 13b88a5

Please sign in to comment.