Skip to content

Commit

Permalink
[WIP]: Refactor for PlanNode comparison and hashing.
Browse files Browse the repository at this point in the history
  • Loading branch information
wsx-ucb committed Feb 17, 2023
1 parent 5378784 commit f901cc2
Show file tree
Hide file tree
Showing 108 changed files with 313 additions and 170 deletions.
4 changes: 2 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl std::fmt::Display for ColumnId {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDesc {
pub data_type: DataType,
pub column_id: ColumnId,
Expand Down Expand Up @@ -228,7 +228,7 @@ impl From<&ColumnDesc> for ProstColumnDesc {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnCatalog {
pub column_desc: ColumnDesc,
pub is_hidden: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::util::sort_util::OrderPair;
/// Includes necessary information for compute node to access data of the table.
///
/// It's a subset of `TableCatalog` in frontend. Refer to `TableCatalog` for more details.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct TableDesc {
/// Id of the table, to find in storage.
pub table_id: TableId,
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::array::ArrayBuilderImpl;
use crate::types::DataType;

/// The field in the schema of the executor's return data
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Field {
pub data_type: DataType,
pub name: String,
Expand Down Expand Up @@ -112,7 +112,7 @@ macro_rules! schema_unnamed {
}

/// the schema of the executor's return data
#[derive(Clone, Debug, Default, PartialEq)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Schema {
pub fields: Vec<Field>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum_into;

/// See also [`ScanRangeProst`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScanRange {
pub eq_conds: Vec<Datum>,
pub range: (Bound<ScalarImpl>, Bound<ScalarImpl>),
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/sort_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::array::{Array, ArrayImpl, DataChunk};
use crate::error::ErrorCode::InternalError;
use crate::error::Result;

#[derive(PartialEq, Eq, Copy, Clone, Debug)]
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
pub enum OrderType {
Ascending,
Descending,
Expand All @@ -47,7 +47,7 @@ impl OrderType {
/// Column index with an order type (ASC or DESC). Used to represent a sort key (`Vec<OrderPair>`).
///
/// Corresponds to protobuf [`ColumnOrder`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderPair {
pub column_idx: usize,
pub order_type: OrderType,
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
Expand All @@ -22,7 +22,7 @@ use risingwave_pb::stream_plan::SinkDesc as ProstSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
/// Id of the sink. For debug now.
pub id: SinkId,
Expand All @@ -48,7 +48,7 @@ pub struct SinkDesc {
pub distribution_key: Vec<usize>,

/// The properties of the sink.
pub properties: HashMap<String, String>,
pub properties: BTreeMap<String, String>,

// The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
// based on both its own derivation on the append-only attribute and other user-specified
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SinkDesc {
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
}
}
Expand All @@ -94,7 +94,7 @@ impl SinkDesc {
pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(),
stream_key: self.stream_key.iter().map(|idx| *idx as _).collect_vec(),
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl From<SinkId> for u32 {
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
/// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
/// allowed.
Expand Down
20 changes: 14 additions & 6 deletions src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -26,7 +26,7 @@ use crate::catalog::{DatabaseId, SchemaId, TableCatalog};
use crate::expr::{Expr, InputRef};
use crate::optimizer::property::FieldOrder;

#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct IndexCatalog {
pub id: IndexId,

Expand All @@ -41,9 +41,9 @@ pub struct IndexCatalog {

pub primary_table: Arc<TableCatalog>,

pub primary_to_secondary_mapping: HashMap<usize, usize>,
pub primary_to_secondary_mapping: BTreeMap<usize, usize>,

pub secondary_to_primary_mapping: HashMap<usize, usize>,
pub secondary_to_primary_mapping: BTreeMap<usize, usize>,

pub original_columns: Vec<ColumnId>,
}
Expand Down Expand Up @@ -127,12 +127,20 @@ impl IndexCatalog {

/// a mapping maps column index of secondary index to column index of primary table
pub fn secondary_to_primary_mapping(&self) -> &HashMap<usize, usize> {
&self.secondary_to_primary_mapping
&self
.secondary_to_primary_mapping
.clone()
.into_iter()
.collect()
}

/// a mapping maps column index of primary table to column index of secondary index
pub fn primary_to_secondary_mapping(&self) -> &HashMap<usize, usize> {
&self.primary_to_secondary_mapping
&self
.primary_to_secondary_mapping
.clone()
.into_iter()
.collect()
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> ProstIndex {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use risingwave_common::catalog::ColumnCatalog;
use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo, WatermarkDesc};
Expand All @@ -23,7 +23,7 @@ use crate::WithOptions;

/// This struct `SourceCatalog` is used in frontend.
/// Compared with `ProstSource`, it only maintains information used during optimization.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SourceCatalog {
pub id: SourceId,
pub name: String,
Expand All @@ -33,7 +33,7 @@ pub struct SourceCatalog {
pub owner: UserId,
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub properties: HashMap<String, String>,
pub properties: BTreeMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
}

Expand Down
11 changes: 6 additions & 5 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ use crate::WithOptions;
///
/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
/// key.
#[derive(Clone, Debug)]
#[cfg_attr(test, derive(Default, PartialEq))]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(test, derive(Default))]
pub struct TableCatalog {
pub id: TableId,

Expand Down Expand Up @@ -129,7 +129,7 @@ pub struct TableCatalog {
pub watermark_columns: FixedBitSet,
}

#[derive(Copy, Clone, Debug, PartialEq)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum TableType {
/// Tables created by `CREATE TABLE`.
Table,
Expand Down Expand Up @@ -170,7 +170,7 @@ impl TableType {
}

/// The version of a table, used by schema change. See [`ProstTableVersion`].
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct TableVersion {
pub version_id: TableVersionId,
pub next_column_id: ColumnId,
Expand Down Expand Up @@ -285,7 +285,8 @@ impl TableCatalog {
pub fn table_desc(&self) -> TableDesc {
use risingwave_common::catalog::TableOption;

let table_options = TableOption::build_table_option(&self.properties);
let table_options =
TableOption::build_table_option(&self.properties.clone().into_iter().collect());

TableDesc {
table_id: self.id,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ pub async fn handle_create_source(
row_id_index,
columns,
pk_column_ids,
properties: with_properties,
properties: with_properties.into_iter().collect(),
info: Some(source_info),
owner: session.user_id(),
watermark_descs,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#![feature(once_cell)]
#![feature(result_option_inspect)]
#![feature(macro_metavar_expr)]
#![feature(min_specialization)]
#![feature(is_some_and)]
#![recursion_limit = "256"]

#[macro_use]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchDelete` implements [`LogicalDelete`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchDelete {
pub base: PlanBase,
logical: LogicalDelete,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order

/// `BatchExchange` imposes a particular distribution on its input
/// without changing its content.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchExchange {
pub base: PlanBase,
input: PlanRef,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::optimizer::plan_node::{
use crate::optimizer::property::{Distribution, Order};
use crate::optimizer::PlanRef;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchExpand {
pub base: PlanBase,
logical: LogicalExpand,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
use crate::utils::Condition;

/// `BatchFilter` implements [`super::LogicalFilter`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchFilter {
pub base: PlanBase,
logical: LogicalFilter,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Order, RequiredDist};

/// `BatchGroupTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchGroupTopN {
pub base: PlanBase,
logical: LogicalTopN,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHashAgg {
pub base: PlanBase,
logical: LogicalAgg,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
/// `BatchHashJoin` implements [`super::LogicalJoin`] with hash table. It builds a hash table
/// from inner (right-side) relation and then probes with data from outer (left-side) relation to
/// get output rows.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHashJoin {
pub base: PlanBase,
logical: LogicalJoin,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::optimizer::property::{Order, RequiredDist};

/// `BatchHopWindow` implements [`super::LogicalHopWindow`] to evaluate specified expressions on
/// input rows
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHopWindow {
pub base: PlanBase,
logical: LogicalHopWindow,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchInsert` implements [`LogicalInsert`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchInsert {
pub base: PlanBase,
logical: LogicalInsert,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Order, RequiredDist};

/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchLimit {
pub base: PlanBase,
logical: LogicalLimit,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::optimizer::plan_node::{
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::optimizer::PlanRef;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchLookupJoin {
pub base: PlanBase,
logical: LogicalJoin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::utils::ConditionDisplay;

/// `BatchNestedLoopJoin` implements [`super::LogicalJoin`] by checking the join condition
/// against all pairs of rows from inner & outer side within 2 layers of loops.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchNestedLoopJoin {
pub base: PlanBase,
logical: LogicalJoin,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::optimizer::plan_node::ToLocalBatch;

/// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
/// rows
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchProject {
pub base: PlanBase,
logical: LogicalProject,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::optimizer::plan_node::{
};
use crate::optimizer::PlanRef;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchProjectSet {
pub base: PlanBase,
logical: LogicalProjectSet,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::optimizer::plan_node::{LogicalScan, ToLocalBatch};
use crate::optimizer::property::{Distribution, DistributionDisplay, Order};

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchSeqScan {
pub base: PlanBase,
logical: LogicalScan,
Expand Down

0 comments on commit f901cc2

Please sign in to comment.