From a0882cd996cba3a0bff74da5550c4566ed778a42 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sat, 15 Jul 2023 18:10:55 +0800 Subject: [PATCH] feat(optimizer, storage): pushdown range-filter to storage (#786) * extract `ScanOptions` Signed-off-by: Runji Wang * storage: remove general filter and only keep range filter Signed-off-by: Runji Wang * planner_v2: add filter to scan node Signed-off-by: Runji Wang * only keep single key in KeyRange Signed-off-by: Runji Wang * add range analysis and filter scan rule Signed-off-by: Runji Wang * fix primary key constraint Signed-off-by: Runji Wang * make range-filter scan work Signed-off-by: Runji Wang * fix clippy Signed-off-by: Runji Wang * fix panic on range-filter scan Signed-off-by: Runji Wang * fix column prune rule for range-filter Signed-off-by: Runji Wang * disable range filter scan for memory storage Signed-off-by: Runji Wang * fix sqllogictest Signed-off-by: Runji Wang * update docs Signed-off-by: Runji Wang * fix format Signed-off-by: Runji Wang * change default range filter to null Signed-off-by: Runji Wang * update planner test Signed-off-by: Runji Wang * add planner test for tpch q9 Signed-off-by: Runji Wang --------- Signed-off-by: Runji Wang --- docs/04-storage-overview.md | 2 +- src/binder_v2/copy.rs | 3 +- src/binder_v2/create_table.rs | 36 +- src/binder_v2/delete.rs | 3 +- src/binder_v2/table.rs | 19 +- src/db.rs | 10 +- src/executor_v2/explain.rs | 4 +- src/executor_v2/mod.rs | 12 +- src/executor_v2/table_scan.rs | 11 +- src/lib.rs | 1 + src/planner/explain.rs | 11 +- src/planner/mod.rs | 123 ++++-- src/planner/rules/mod.rs | 33 +- src/planner/rules/plan.rs | 50 +-- src/planner/rules/range.rs | 114 +++++ src/planner/rules/schema.rs | 2 +- src/planner/rules/type_.rs | 2 +- src/storage/memory/transaction.rs | 27 +- src/storage/mod.rs | 75 +++- src/storage/secondary/compactor.rs | 9 +- src/storage/secondary/options.rs | 6 +- src/storage/secondary/rowset/disk_rowset.rs | 37 +- .../secondary/rowset/rowset_iterator.rs | 399 ++++++------------ src/storage/secondary/transaction.rs | 46 +- src/v1/binder/expression/mod.rs | 17 - src/v1/executor/evaluator.rs | 49 --- src/v1/executor/mod.rs | 2 - src/v1/executor/table_scan.rs | 12 +- src/v1/optimizer/mod.rs | 9 +- .../plan_nodes/logical_table_scan.rs | 53 +-- .../plan_nodes/physical_table_scan.rs | 5 +- src/v1/optimizer/rules/filter_scan_rule.rs | 3 +- tests/planner_test/count.planner.sql | 4 +- .../extract-common-predicate.planner.sql | 2 +- tests/planner_test/tpch.planner.sql | 160 ++++++- tests/planner_test/tpch.yml | 38 ++ tests/sql/tpch/create.sql | 14 +- 37 files changed, 766 insertions(+), 637 deletions(-) create mode 100644 src/planner/rules/range.rs diff --git a/docs/04-storage-overview.md b/docs/04-storage-overview.md index 61d717eb..019388f2 100644 --- a/docs/04-storage-overview.md +++ b/docs/04-storage-overview.md @@ -282,7 +282,7 @@ Also there are a lot of things going on internally in *secondary*. Upon starting A table will typically contain multiple RowSets. *Secondary* will create an iterator over multiple RowSets either by using `MergeIterator` (to do a merge sort), or by using `ConcatIterator` (by yielding data one RowSet by RowSet). Under those iterators, there is one of the most fundamental and important iterator, `RowSetIterator`. `RowSetIterator` scans the underlying columns by using `ColumnIterator`, which uses `BlockIterator` internally. It will also take delete vectors into consideration, so as to filter out deleted rows. -`RowSetIterator` also supports filter scan. Users can provide a filter expression to `RowSetIterator`, and the iterator will skip reading blocks to reduce I/O. +`RowSetIterator` also supports range filter scan. Users can provide a range for primary key to `RowSetIterator`, and the iterator will skip reading blocks to reduce I/O. There is a special column called `RowHandler` in *secondary*. This column is of int64 type, which contains RowSet id on upper 32 bits, and row offset in lower 32 bits. For example, if there is a table, which contains a RowSet with the following data: diff --git a/src/binder_v2/copy.rs b/src/binder_v2/copy.rs index 7f9f9cb4..27509175 100644 --- a/src/binder_v2/copy.rs +++ b/src/binder_v2/copy.rs @@ -71,7 +71,8 @@ impl Binder { let scan = if is_internal { self.egraph.add(Node::Internal([table, cols])) } else { - self.egraph.add(Node::Scan([table, cols])) + let true_ = self.egraph.add(Node::true_()); + self.egraph.add(Node::Scan([table, cols, true_])) }; self.egraph.add(Node::CopyTo([ext_source, scan])) } else { diff --git a/src/binder_v2/create_table.rs b/src/binder_v2/create_table.rs index 94f06575..497fdb0c 100644 --- a/src/binder_v2/create_table.rs +++ b/src/binder_v2/create_table.rs @@ -90,8 +90,18 @@ impl Binder { return Err(BindError::InvalidColumn(name.clone())); } } - ordered_pk_ids = - Binder::ordered_pks_from_constraint(&pks_name_from_constraints, columns); + // We have used `pks_name_from_constraints` to get the primary keys' name sorted by + // declaration order in "primary key(c1, c2..)" syntax. Now we transfer the name to id + // to get the sorted ID + ordered_pk_ids = pks_name_from_constraints + .iter() + .map(|name| { + columns + .iter() + .position(|c| c.name.value.eq_ignore_ascii_case(name)) + .unwrap() as ColumnId + }) + .collect(); } let mut columns: Vec = columns @@ -138,26 +148,6 @@ impl Binder { ordered_pks } - /// We have used `pks_name_from_constraints` to get the primary keys' name sorted by declaration - /// order in "primary key(c1, c2..)" syntax. Now we transfer the name to id to get the sorted - /// ID - fn ordered_pks_from_constraint(pks_name: &[String], columns: &[ColumnDef]) -> Vec { - let mut ordered_pks = vec![0; pks_name.len()]; - let mut pos_in_ordered_pk = HashMap::new(); // used to get pos from column name - pks_name.iter().enumerate().for_each(|(pos, name)| { - pos_in_ordered_pk.insert(name, pos); - }); - - columns.iter().enumerate().for_each(|(index, colum_desc)| { - let column_name = &colum_desc.name.value; - if pos_in_ordered_pk.contains_key(column_name) { - let id = index as ColumnId; - let pos = *(pos_in_ordered_pk.get(column_name).unwrap()); - ordered_pks[pos] = id; - } - }); - ordered_pks - } /// get the primary keys' name sorted by declaration order in "primary key(c1, c2..)" syntax. fn pks_name_from_constraints(constraints: &[TableConstraint]) -> Vec { let mut pks_name_from_constraints = vec![]; @@ -169,7 +159,7 @@ impl Binder { columns, .. } if *is_primary => columns.iter().for_each(|ident| { - pks_name_from_constraints.push(ident.value.clone()); + pks_name_from_constraints.push(ident.value.to_lowercase()); }), _ => continue, } diff --git a/src/binder_v2/delete.rs b/src/binder_v2/delete.rs index a079e34a..34937d23 100644 --- a/src/binder_v2/delete.rs +++ b/src/binder_v2/delete.rs @@ -14,7 +14,8 @@ impl Binder { return Err(BindError::NotSupportedOnInternalTable); } let cols = self.bind_table_name(name, None, true)?; - let scan = self.egraph.add(Node::Scan([table_id, cols])); + let true_ = self.egraph.add(Node::true_()); + let scan = self.egraph.add(Node::Scan([table_id, cols, true_])); let cond = self.bind_where(selection)?; let filter = self.egraph.add(Node::Filter([cond, scan])); Ok(self.egraph.add(Node::Delete([table_id, filter]))) diff --git a/src/binder_v2/table.rs b/src/binder_v2/table.rs index 3652c2c6..10ed425c 100644 --- a/src/binder_v2/table.rs +++ b/src/binder_v2/table.rs @@ -36,10 +36,10 @@ impl Binder { /// ```ignore /// (join inner true /// (join inner (= $1.1 $2.1) - /// (scan $1 (list $1.1 $1.2)) - /// (scan $2 (list $2.1)) + /// (scan $1 (list $1.1 $1.2) null) + /// (scan $2 (list $2.1) null) /// ) - /// (scan $3 (list $3.1 $3.2)) + /// (scan $3 (list $3.1 $3.2) null) /// ) /// ``` fn bind_table_with_joins(&mut self, tables: TableWithJoins) -> Result { @@ -55,7 +55,7 @@ impl Binder { /// Returns a `Scan` plan of table or a plan of subquery. /// /// # Example - /// - `bind_table_factor(t)` => `(scan $1 (list $1.1 $1.2 $1.3))` + /// - `bind_table_factor(t)` => `(scan $1 (list $1.1 $1.2 $1.3) null)` /// - `bind_table_factor(select 1)` => `(values (1))` fn bind_table_factor(&mut self, table: TableFactor) -> Result { match table { @@ -65,7 +65,8 @@ impl Binder { let id = if is_internal { self.egraph.add(Node::Internal([table_id, cols])) } else { - self.egraph.add(Node::Scan([table_id, cols])) + let true_ = self.egraph.add(Node::null()); + self.egraph.add(Node::Scan([table_id, cols, true_])) }; Ok(id) } @@ -244,6 +245,7 @@ mod tests { use super::*; use crate::catalog::{ColumnCatalog, RootCatalog}; use crate::parser::parse; + use crate::planner::Optimizer; #[test] fn bind_test_subquery() { @@ -256,11 +258,12 @@ mod tests { let stmts = parse("select x.b from (select a as b from t) as x").unwrap(); let mut binder = Binder::new(catalog.clone()); + let optimizer = Optimizer::new(catalog.clone()); for stmt in stmts { - let result = binder.bind(stmt); - println!("{}", result.as_ref().unwrap().pretty(10)); + let plan = binder.bind(stmt).unwrap(); + println!("{}", plan.pretty(10)); - let optimized = crate::planner::optimize(&result.unwrap()); + let optimized = optimizer.optimize(&plan); let mut egraph = egg::EGraph::new(TypeSchemaAnalysis { catalog: catalog.clone(), diff --git a/src/db.rs b/src/db.rs index f2b44395..8d7d8600 100644 --- a/src/db.rs +++ b/src/db.rs @@ -214,7 +214,11 @@ impl Database { for stmt in stmts { let mut binder = crate::binder_v2::Binder::new(self.catalog.clone()); let bound = binder.bind(stmt)?; - let optimized = crate::planner::optimize(&bound); + let mut optimizer = crate::planner::Optimizer::new(self.catalog.clone()); + if !self.storage.support_range_filter_scan() { + optimizer.disable_rules("filter-scan"); + } + let optimized = optimizer.optimize(&bound); let executor = match self.storage.clone() { StorageImpl::InMemoryStorage(s) => { crate::executor_v2::build(self.catalog.clone(), s, &optimized) @@ -237,7 +241,7 @@ impl Database { let mut binder = Binder::new(self.catalog.clone()); let logical_planner = LogicalPlaner::default(); let mut optimizer = Optimizer { - enable_filter_scan: self.storage.enable_filter_scan(), + enable_filter_scan: self.storage.support_range_filter_scan(), }; // TODO: parallelize let mut outputs: Vec = vec![]; @@ -276,7 +280,7 @@ impl Database { let mut binder = Binder::new(self.catalog.clone()); let logical_planner = LogicalPlaner::default(); let mut optimizer = Optimizer { - enable_filter_scan: self.storage.enable_filter_scan(), + enable_filter_scan: self.storage.support_range_filter_scan(), }; let mut plans = vec![]; for stmt in stmts { diff --git a/src/executor_v2/explain.rs b/src/executor_v2/explain.rs index 6c19338e..f3c58066 100644 --- a/src/executor_v2/explain.rs +++ b/src/executor_v2/explain.rs @@ -5,7 +5,7 @@ use pretty_xmlish::PrettyConfig; use super::*; use crate::array::{ArrayImpl, Utf8Array}; -use crate::planner::{costs, Explain}; +use crate::planner::{Explain, Optimizer}; /// The executor of `explain` statement. pub struct ExplainExecutor { @@ -15,7 +15,7 @@ pub struct ExplainExecutor { impl ExplainExecutor { pub fn execute(self) -> BoxedExecutor { - let costs = costs(&self.plan); + let costs = Optimizer::new(self.catalog.clone()).costs(&self.plan); let explain_obj = Explain::of(&self.plan) .with_costs(&costs) .with_catalog(&self.catalog); diff --git a/src/executor_v2/mod.rs b/src/executor_v2/mod.rs index d7fa61f5..99d970af 100644 --- a/src/executor_v2/mod.rs +++ b/src/executor_v2/mod.rs @@ -49,7 +49,7 @@ use self::values::*; use self::window::*; use crate::array::DataChunk; use crate::catalog::RootCatalogRef; -use crate::planner::{Expr, RecExpr, TypeSchemaAnalysis}; +use crate::planner::{Expr, ExprAnalysis, RecExpr, TypeSchemaAnalysis}; use crate::storage::{Storage, TracedStorageError}; use crate::types::{ColumnIndex, ConvertError, DataType}; @@ -207,11 +207,19 @@ impl Builder { fn build_id(&self, id: Id) -> BoxedExecutor { use Expr::*; let stream = match self.node(id).clone() { - Scan([table, list]) => TableScanExecutor { + Scan([table, list, filter]) => TableScanExecutor { table_id: self.node(table).as_table(), columns: (self.node(list).as_list().iter()) .map(|id| self.node(*id).as_column()) .collect(), + filter: { + // analyze range for the filter + let mut egraph = egg::EGraph::new(ExprAnalysis { + catalog: self.catalog.clone(), + }); + let root = egraph.add_expr(&self.recexpr(filter)); + egraph[root].data.range.clone().map(|(_, r)| r) + }, storage: self.storage.clone(), } .execute(), diff --git a/src/executor_v2/table_scan.rs b/src/executor_v2/table_scan.rs index 05b6c9c7..7b8f74e9 100644 --- a/src/executor_v2/table_scan.rs +++ b/src/executor_v2/table_scan.rs @@ -5,12 +5,15 @@ use std::sync::Arc; use super::*; use crate::array::DataChunk; use crate::catalog::{ColumnRefId, TableRefId}; -use crate::storage::{Storage, StorageColumnRef, Table, Transaction, TxnIterator}; +use crate::storage::{ + KeyRange, ScanOptions, Storage, StorageColumnRef, Table, Transaction, TxnIterator, +}; /// The executor of table scan operation. pub struct TableScanExecutor { pub table_id: TableRefId, pub columns: Vec, + pub filter: Option, pub storage: Arc, } @@ -37,12 +40,8 @@ impl TableScanExecutor { let mut it = txn .scan( - &[], - &[], &col_idx, - false, // TODO: is_sorted - false, - None, // TODO: support filter scan + ScanOptions::default().with_filter_opt(self.filter), ) .await?; diff --git a/src/lib.rs b/src/lib.rs index b14f0675..33ecc762 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ #![feature(iterator_try_collect)] #![feature(async_fn_in_trait)] #![feature(return_position_impl_trait_in_trait)] +#![feature(let_chains)] #![allow(incomplete_features)] /// Top-level structure of the database. diff --git a/src/planner/explain.rs b/src/planner/explain.rs index 04c1acfe..fbdc4085 100644 --- a/src/planner/explain.rs +++ b/src/planner/explain.rs @@ -202,8 +202,17 @@ impl<'a> Explain<'a> { vec![self.expr(a).pretty()], ), - Scan([table, list]) | Internal([table, list]) => Pretty::childless_record( + Scan([table, list, filter]) => Pretty::childless_record( "Scan", + vec![ + ("table", self.expr(table).pretty()), + ("list", self.expr(list).pretty()), + ("filter", self.expr(filter).pretty()), + ] + .with_cost(cost), + ), + Internal([table, list]) => Pretty::childless_record( + "Internal", vec![ ("table", self.expr(table).pretty()), ("list", self.expr(list).pretty()), diff --git a/src/planner/mod.rs b/src/planner/mod.rs index be15f5fe..c4833740 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -1,10 +1,12 @@ // Copyright 2023 RisingLight Project Authors. Licensed under Apache-2.0. +use std::collections::HashSet; + use egg::{define_language, CostFunction, Id, Symbol}; use crate::binder_v2::copy::ExtSource; use crate::binder_v2::{BoundDrop, CreateTable}; -use crate::catalog::{ColumnRefId, TableRefId}; +use crate::catalog::{ColumnRefId, RootCatalogRef, TableRefId}; use crate::parser::{BinaryOperator, UnaryOperator}; use crate::types::{ColumnIndex, DataTypeKind, DataValue, DateTimeField}; @@ -89,7 +91,7 @@ define_language! { "cast" = Cast([Id; 2]), // (cast type expr) // plans - "scan" = Scan([Id; 2]), // (scan table [column..]) + "scan" = Scan([Id; 3]), // (scan table [column..] filter) "internal" = Internal([Id; 2]), // (internal table [column..]) "values" = Values(Box<[Id]>), // (values [expr..]..) "proj" = Proj([Id; 2]), // (proj [expr..] child) @@ -232,57 +234,88 @@ impl ExprExt for egg::EClass { } } -/// Optimize the given expression. -pub fn optimize(expr: &RecExpr) -> RecExpr { - let mut expr = expr.clone(); +/// Plan optimizer. +pub struct Optimizer { + catalog: RootCatalogRef, + disabled_rules: HashSet, +} + +impl Optimizer { + /// Creates a new optimizer. + pub fn new(catalog: RootCatalogRef) -> Self { + Self { + catalog, + disabled_rules: HashSet::default(), + } + } - // 1. pushdown - let mut best_cost = f32::MAX; - // to prune costy nodes, we iterate multiple times and only keep the best one for each run. - for _ in 0..3 { - let runner = egg::Runner::default() + /// Disable rules that match the given pattern. + pub fn disable_rules(&mut self, pattern: &str) { + tracing::info!("disable rules: {pattern}"); + self.disabled_rules.insert(pattern.to_owned()); + } + + /// Optimize the given expression. + pub fn optimize(&self, expr: &RecExpr) -> RecExpr { + let mut expr = expr.clone(); + + // 1. pushdown + let mut best_cost = f32::MAX; + // to prune costy nodes, we iterate multiple times and only keep the best one for each run. + for _ in 0..3 { + let runner = egg::Runner::<_, _, ()>::new(ExprAnalysis { + catalog: self.catalog.clone(), + }) .with_expr(&expr) .with_iter_limit(6) - .run(&*rules::STAGE1_RULES); - let cost_fn = cost::CostFn { - egraph: &runner.egraph, - }; - let extractor = egg::Extractor::new(&runner.egraph, cost_fn); - let cost; - (cost, expr) = extractor.find_best(runner.roots[0]); - if cost >= best_cost { - break; + .run(rules::STAGE1_RULES.iter().filter(|rule| { + !self + .disabled_rules + .iter() + .any(|name| rule.name.as_str().contains(name)) + })); + let cost_fn = cost::CostFn { + egraph: &runner.egraph, + }; + let extractor = egg::Extractor::new(&runner.egraph, cost_fn); + let cost; + (cost, expr) = extractor.find_best(runner.roots[0]); + if cost >= best_cost { + break; + } + best_cost = cost; + // println!( + // "{}", + // crate::planner::Explain::of(&expr).with_costs(&costs(&expr)) + // ); } - best_cost = cost; - // println!( - // "{}", - // crate::planner::Explain::of(&expr).with_costs(&costs(&expr)) - // ); - } - // 2. join reorder and hashjoin - let runner = egg::Runner::default() + // 2. join reorder and hashjoin + let runner = egg::Runner::<_, _, ()>::new(ExprAnalysis { + catalog: self.catalog.clone(), + }) .with_expr(&expr) .run(&*rules::STAGE2_RULES); - let cost_fn = cost::CostFn { - egraph: &runner.egraph, - }; - let extractor = egg::Extractor::new(&runner.egraph, cost_fn); - (_, expr) = extractor.find_best(runner.roots[0]); + let cost_fn = cost::CostFn { + egraph: &runner.egraph, + }; + let extractor = egg::Extractor::new(&runner.egraph, cost_fn); + (_, expr) = extractor.find_best(runner.roots[0]); - expr -} + expr + } -/// Returns the cost for each node in the expression. -pub fn costs(expr: &RecExpr) -> Vec { - let mut egraph = EGraph::default(); - // NOTE: we assume Expr node has the same Id in both EGraph and RecExpr. - egraph.add_expr(expr); - let mut cost_fn = cost::CostFn { egraph: &egraph }; - let mut costs = vec![0.0; expr.as_ref().len()]; - for (i, node) in expr.as_ref().iter().enumerate() { - let cost = cost_fn.cost(node, |i| costs[usize::from(i)]); - costs[i] = cost; + /// Returns the cost for each node in the expression. + pub fn costs(&self, expr: &RecExpr) -> Vec { + let mut egraph = EGraph::default(); + // NOTE: we assume Expr node has the same Id in both EGraph and RecExpr. + egraph.add_expr(expr); + let mut cost_fn = cost::CostFn { egraph: &egraph }; + let mut costs = vec![0.0; expr.as_ref().len()]; + for (i, node) in expr.as_ref().iter().enumerate() { + let cost = cost_fn.cost(node, |i| costs[usize::from(i)]); + costs[i] = cost; + } + costs } - costs } diff --git a/src/planner/rules/mod.rs b/src/planner/rules/mod.rs index b3a90b06..135ade02 100644 --- a/src/planner/rules/mod.rs +++ b/src/planner/rules/mod.rs @@ -5,19 +5,21 @@ //! Currently we have 6 kinds of analyses. //! Each of them is defined in a sub-module: //! -//! | module | rules | analysis | analysis data | -//! |------------|-----------------------|-------------------------------|----------------| -//! | [`expr`] | expr simplification | constant value | [`ConstValue`] | -//! | [`plan`] | plan optimization | use and defination of columns | [`ColumnSet`] | -//! | [`agg`] | agg extraction | aggregations in an expr | [`AggSet`] | -//! | [`schema`] | column id to index | output schema of a plan | [`Schema`] | -//! | [`type_`] | | data type | [`Type`] | -//! | [`rows`] | | estimated rows | [`Rows`] | +//! | module | rules | analysis | analysis data | +//! |------------|-----------------------|-------------------------------|--------------------| +//! | [`expr`] | expr simplification | constant value | [`ConstValue`] | +//! | [`range`] | filter scan rule | range condition | [`RangeCondition`] | +//! | [`plan`] | plan optimization | use and defination of columns | [`ColumnSet`] | +//! | [`agg`] | agg extraction | aggregations in an expr | [`AggSet`] | +//! | [`schema`] | column id to index | output schema of a plan | [`Schema`] | +//! | [`type_`] | | data type | [`Type`] | +//! | [`rows`] | | estimated rows | [`Rows`] | //! //! It would be best if you have a background in program analysis. //! Here is a recommended course: . //! //! [`ConstValue`]: expr::ConstValue +//! [`RangeCondition`]: range::RangeCondition //! [`ColumnSet`]: plan::ColumnSet //! [`AggSet`]: agg::AggSet //! [`Schema`]: schema::Schema @@ -37,6 +39,7 @@ use crate::types::F32; mod agg; mod expr; mod plan; +mod range; mod rows; mod schema; mod type_; @@ -48,6 +51,7 @@ pub static STAGE1_RULES: LazyLock> = LazyLock::new(|| { let mut rules = vec![]; rules.append(&mut expr::rules()); rules.append(&mut plan::always_better_rules()); + rules.append(&mut range::filter_scan_rule()); rules }); @@ -61,7 +65,9 @@ pub static STAGE2_RULES: LazyLock> = LazyLock::new(|| { /// The unified analysis for all rules. #[derive(Default)] -pub struct ExprAnalysis; +pub struct ExprAnalysis { + pub catalog: RootCatalogRef, +} /// The analysis data associated with each eclass. /// @@ -71,6 +77,9 @@ pub struct Data { /// Some if the expression is a constant. pub constant: expr::ConstValue, + /// Some if the expression is a range condition. + pub range: range::RangeCondition, + /// For expression node, it is the set of columns used in the expression. /// For plan node, it is the set of columns produced by the plan. pub columns: plan::ColumnSet, @@ -89,6 +98,7 @@ impl Analysis for ExprAnalysis { fn make(egraph: &EGraph, enode: &Expr) -> Self::Data { Data { constant: expr::eval_constant(egraph, enode), + range: range::analyze_range(egraph, enode), columns: plan::analyze_columns(egraph, enode), schema: schema::analyze_schema(enode, |i| egraph[*i].data.schema.clone()), rows: rows::analyze_rows(egraph, enode), @@ -105,13 +115,16 @@ impl Analysis for ExprAnalysis { /// new result `Some(1)` with the previous `None` and keep `Some(1)` as the final result. fn merge(&mut self, to: &mut Self::Data, from: Self::Data) -> DidMerge { let merge_const = egg::merge_max(&mut to.constant, from.constant); + // if both are Some, choose arbitrary one. not sure whether it is safe. + let merge_range = + egg::merge_option(&mut to.range, from.range, |_, _| DidMerge(false, true)); let merge_columns = merge_small_set(&mut to.columns, from.columns); let merge_schema = egg::merge_max(&mut to.schema, from.schema); let merge_rows = egg::merge_min( unsafe { std::mem::transmute(&mut to.rows) }, F32::from(from.rows), ); - merge_const | merge_columns | merge_schema | merge_rows + merge_const | merge_range | merge_columns | merge_schema | merge_rows } /// Modify the graph after analyzing a node. diff --git a/src/planner/rules/plan.rs b/src/planner/rules/plan.rs index b2808ac4..54842026 100644 --- a/src/planner/rules/plan.rs +++ b/src/planner/rules/plan.rs @@ -165,10 +165,10 @@ pub fn projection_pushdown_rules() -> Vec { vec![ ), // column pruning rw!("pushdown-proj-scan"; - "(proj ?exprs (scan ?table ?columns))" => + "(proj ?exprs (scan ?table ?columns ?filter))" => { ColumnPrune { - pattern: pattern("(proj ?exprs (scan ?table ?columns))"), - used: var("?exprs"), + pattern: pattern("(proj ?exprs (scan ?table ?columns ?filter))"), + used: [var("?exprs"), var("?filter")], columns: var("?columns"), }} ), @@ -287,7 +287,7 @@ impl Applier for ProjectionPushdown { /// Remove element from `columns` whose column set is not a subset of `used` struct ColumnPrune { pattern: Pattern, - used: Var, + used: [Var; 2], columns: Var, } @@ -300,10 +300,12 @@ impl Applier for ColumnPrune { searcher_ast: Option<&PatternAst>, rule_name: Symbol, ) -> Vec { - let used = &egraph[subst[self.used]].data.columns; + let used1 = &egraph[subst[self.used[0]]].data.columns; + let used2 = &egraph[subst[self.used[1]]].data.columns; + let used = used1.union(used2).cloned().collect(); let columns = egraph[subst[self.columns]].as_list(); let filtered = (columns.iter().cloned()) - .filter(|id| egraph[*id].data.columns.is_subset(used)) + .filter(|id| egraph[*id].data.columns.is_subset(&used)) .collect(); let id = egraph.add(Expr::List(filtered)); @@ -335,14 +337,14 @@ mod tests { (proj (list $1.2 $2.2) (filter (and (= $1.1 $2.1) (= $2.3 'A')) (join inner true - (scan $1 (list $1.1 $1.2)) - (scan $2 (list $2.1 $2.2 $2.3)) + (scan $1 (list $1.1 $1.2) null) + (scan $2 (list $2.1 $2.2 $2.3) null) )))" => " (proj (list $1.2 $2.2) (join inner (= $1.1 $2.1) - (scan $1 (list $1.1 $1.2)) + (scan $1 (list $1.1 $1.2) null) (filter (= $2.3 'A') - (scan $2 (list $2.1 $2.2 $2.3)) + (scan $2 (list $2.1 $2.2 $2.3) null) ) ))" } @@ -356,16 +358,16 @@ mod tests { (filter (and (= $1.1 $2.1) (= $3.1 $2.1)) (join inner true (join inner true - (scan $1 (list $1.1 $1.2)) - (scan $2 (list $2.1 $2.2)) + (scan $1 (list $1.1 $1.2) null) + (scan $2 (list $2.1 $2.2) null) ) - (scan $3 (list $3.1 $3.2)) + (scan $3 (list $3.1 $3.2) null) ))" => " (join inner (= $1.1 $2.1) - (scan $1 (list $1.1 $1.2)) + (scan $1 (list $1.1 $1.2) null) (join inner (= $2.1 $3.1) - (scan $2 (list $2.1 $2.2)) - (scan $3 (list $3.1 $3.2)) + (scan $2 (list $2.1 $2.2) null) + (scan $3 (list $3.1 $3.2) null) ) )" } @@ -378,14 +380,14 @@ mod tests { " (filter (and (= $1.1 $2.1) (> $1.2 2)) (join inner true - (scan $1 (list $1.1 $1.2)) - (scan $2 (list $2.1 $2.2)) + (scan $1 (list $1.1 $1.2) null) + (scan $2 (list $2.1 $2.2) null) ))" => " (hashjoin inner (list $1.1) (list $2.1) (filter (> $1.2 2) - (scan $1 (list $1.1 $1.2)) + (scan $1 (list $1.1 $1.2) null) ) - (scan $2 (list $2.1 $2.2)) + (scan $2 (list $2.1 $2.2) null) )" } @@ -397,15 +399,15 @@ mod tests { (proj (list $1.2) (filter (> (+ $1.2 $2.2) 1) (join inner (= $1.1 $2.1) - (scan $1 (list $1.1 $1.2 $1.3)) - (scan $2 (list $2.1 $2.2 $2.3)) + (scan $1 (list $1.1 $1.2 $1.3) null) + (scan $2 (list $2.1 $2.2 $2.3) null) )))" => " (proj (list $1.2) (filter (> (+ $1.2 $2.2) 1) (proj (list $1.2 $2.2) (join inner (= $1.1 $2.1) - (scan $1 (list $1.1 $1.2)) - (scan $2 (list $2.1 $2.2)) + (scan $1 (list $1.1 $1.2) null) + (scan $2 (list $2.1 $2.2) null) ))))" } } diff --git a/src/planner/rules/range.rs b/src/planner/rules/range.rs new file mode 100644 index 00000000..ffd83e50 --- /dev/null +++ b/src/planner/rules/range.rs @@ -0,0 +1,114 @@ +// Copyright 2023 RisingLight Project Authors. Licensed under Apache-2.0. + +//! Range filter. + +use std::ops::Bound; + +use super::*; +use crate::catalog::ColumnRefId; +use crate::storage::KeyRange; + +/// The data type of range analysis. +/// +/// If Some, the expression is a range condition. +/// +/// ```text +/// a = 1 +/// a > 1 +/// a <= 1 +/// -1 < a < 1 +/// ``` +pub type RangeCondition = Option<(ColumnRefId, KeyRange)>; + +/// Returns all columns involved in the node. +pub fn analyze_range(egraph: &EGraph, enode: &Expr) -> RangeCondition { + use Expr::*; + let column = |i: &Id| { + egraph[*i].nodes.iter().find_map(|e| match e { + Expr::Column(c) => Some(*c), + _ => None, + }) + }; + let range = |i: &Id| egraph[*i].data.range.as_ref(); + let constant = |i: &Id| egraph[*i].data.constant.as_ref(); + match enode { + Eq([a, b]) | Gt([a, b]) | GtEq([a, b]) | Lt([a, b]) | LtEq([a, b]) => { + // normalize `v op k` to `k op v` + let (k, v, enode) = if let (Some(v), Some(k)) = (constant(a), column(b)) { + let revnode = match enode { + Eq(_) => Eq([*b, *a]), + Gt(_) => Lt([*b, *a]), + GtEq(_) => LtEq([*b, *a]), + Lt(_) => Gt([*b, *a]), + LtEq(_) => GtEq([*b, *a]), + _ => unreachable!(), + }; + (k, v, revnode) + } else if let (Some(k), Some(v)) = (column(a), constant(b)) { + (k, v, enode.clone()) + } else { + return None; + }; + let start = match enode { + Eq(_) | GtEq(_) => Bound::Included(v.clone()), + Gt(_) => Bound::Excluded(v.clone()), + Lt(_) | LtEq(_) => Bound::Unbounded, + _ => unreachable!(), + }; + let end = match enode { + Eq(_) | LtEq(_) => Bound::Included(v.clone()), + Lt(_) => Bound::Excluded(v.clone()), + Gt(_) | GtEq(_) => Bound::Unbounded, + _ => unreachable!(), + }; + Some((k, KeyRange { start, end })) + } + And([a, b]) => { + let (ka, ra) = range(a)?; + let (kb, rb) = range(b)?; + if ka != kb { + return None; + } + // if both a and b have bound at start or end, return None + let start = match (&ra.start, &rb.start) { + (Bound::Unbounded, s) | (s, Bound::Unbounded) => s.clone(), + _ => return None, + }; + let end = match (&ra.end, &rb.end) { + (Bound::Unbounded, s) | (s, Bound::Unbounded) => s.clone(), + _ => return None, + }; + Some((*ka, KeyRange { start, end })) + } + _ => None, + } +} + +#[rustfmt::skip] +pub fn filter_scan_rule() -> Vec { vec![ + // pushdown range condition to scan + rw!("filter-scan"; + "(filter ?cond (scan ?table ?columns true))" => + "(scan ?table ?columns ?cond)" + if is_primary_key_range("?cond") + ), + rw!("filter-scan-1"; + "(filter (and ?cond1 ?cond2) (scan ?table ?columns true))" => + "(filter ?cond2 (scan ?table ?columns ?cond1))" + if is_primary_key_range("?cond1") + ), +]} + +/// Returns true if the expression is a primary key range. +fn is_primary_key_range(expr: &str) -> impl Fn(&mut EGraph, Id, &Subst) -> bool { + let var = var(expr); + move |egraph, _, subst| { + let Some((column, _)) = &egraph[subst[var]].data.range else { return false }; + egraph + .analysis + .catalog + .get_column(column) + .unwrap() + .is_primary() + } +} diff --git a/src/planner/rules/schema.rs b/src/planner/rules/schema.rs index 9ff16da8..e5c4cd5d 100644 --- a/src/planner/rules/schema.rs +++ b/src/planner/rules/schema.rs @@ -22,7 +22,7 @@ pub fn analyze_schema(enode: &Expr, x: impl Fn(&Id) -> Schema) -> Schema { List(ids) => ids.to_vec(), // plans that change schema - Scan([_, columns]) | Internal([_, columns]) => x(columns), + Scan([_, columns, _]) | Internal([_, columns]) => x(columns), Values(vs) => x(&vs[0]), Proj([exprs, _]) => x(exprs), Agg([exprs, group_keys, _]) => concat(x(exprs), x(group_keys)), diff --git a/src/planner/rules/type_.rs b/src/planner/rules/type_.rs index 494b3831..6616fccc 100644 --- a/src/planner/rules/type_.rs +++ b/src/planner/rules/type_.rs @@ -123,7 +123,7 @@ pub fn analyze_type(enode: &Expr, x: impl Fn(&Id) -> Type, catalog: &RootCatalog Join([_, _, l, r]) | HashJoin([_, _, _, l, r]) => concat_struct(x(l)?, x(r)?), // plans that change schema - Scan([_, columns]) => x(columns), + Scan([_, columns, _]) => x(columns), Values(rows) => { if rows.is_empty() { return Ok(Kind::Null.not_null()); diff --git a/src/storage/memory/transaction.rs b/src/storage/memory/transaction.rs index fea3a43d..b8cf9877 100644 --- a/src/storage/memory/transaction.rs +++ b/src/storage/memory/transaction.rs @@ -9,9 +9,7 @@ use super::table::InMemoryTableInnerRef; use super::{InMemoryRowHandler, InMemoryTable, InMemoryTxnIterator}; use crate::array::{ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk}; use crate::catalog::{find_sort_key_id, ColumnCatalog}; -use crate::storage::{StorageColumnRef, StorageResult, Transaction}; -use crate::types::DataValue; -use crate::v1::binder::BoundExpr; +use crate::storage::{ScanOptions, StorageColumnRef, StorageResult, Transaction}; /// A transaction running on `InMemoryStorage`. pub struct InMemoryTransaction { @@ -104,26 +102,13 @@ impl Transaction for InMemoryTransaction { // TODO: remove this unused variable async fn scan( &self, - begin_sort_key: &[DataValue], - end_sort_key: &[DataValue], col_idx: &[StorageColumnRef], - is_sorted: bool, - reversed: bool, - expr: Option, + opts: ScanOptions, ) -> StorageResult { - assert!(expr.is_none(), "MemTxn doesn't support filter scan"); - assert!(!reversed, "reverse iterator is not supported for now"); - - assert!( - begin_sort_key.is_empty(), - "sort_key is not supported in InMemoryEngine for now" - ); - assert!( - end_sort_key.is_empty(), - "sort_key is not supported in InMemoryEngine for now" - ); - - let snapshot = if is_sorted { + assert!(opts.filter.is_none(), "MemTxn doesn't support filter scan"); + assert!(!opts.reversed, "reverse iterator is not supported for now"); + + let snapshot = if opts.is_sorted { sort_datachunk_by_pk(&self.snapshot, &self.column_infos) } else { self.snapshot.clone() diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b70100b5..7506e480 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -10,9 +10,11 @@ pub use secondary::{SecondaryStorage, StorageOptions as SecondaryStorageOptions} mod error; pub use error::{StorageError, StorageResult, TracedStorageError}; +use serde::Serialize; mod chunk; use std::future::Future; +use std::ops::{Bound, RangeBounds}; use std::sync::Arc; pub use chunk::*; @@ -21,7 +23,6 @@ use enum_dispatch::enum_dispatch; use crate::array::{ArrayImpl, DataChunk}; use crate::catalog::{ColumnCatalog, ColumnId, SchemaId, TableRefId}; use crate::types::DataValue; -use crate::v1::binder::BoundExpr; #[enum_dispatch(StorageDispatch)] #[derive(Clone)] @@ -44,7 +45,8 @@ impl StorageImpl { } impl StorageImpl { - pub fn enable_filter_scan(&self) -> bool { + /// Returns true if the storage engine supports range filter scan. + pub fn support_range_filter_scan(&self) -> bool { match self { Self::SecondaryStorage(_) => true, Self::InMemoryStorage(_) => false, @@ -128,12 +130,8 @@ pub trait Transaction: Sync + Send + 'static { /// Scan one or multiple columns. fn scan<'a>( &'a self, - begin_sort_key: &'a [DataValue], - end_sort_key: &'a [DataValue], col_idx: &'a [StorageColumnRef], - is_sorted: bool, - reversed: bool, - expr: Option, + options: ScanOptions, ) -> impl Future> + Send + 'a; /// Append data to the table. Generally, `columns` should be in the same order as @@ -154,6 +152,69 @@ pub trait Transaction: Sync + Send + 'static { fn abort(self) -> impl Future> + Send; } +/// Options for scanning. +#[derive(Debug, Default)] +pub struct ScanOptions { + is_sorted: bool, + reversed: bool, + filter: Option, +} + +impl ScanOptions { + /// Scan with filter. + pub fn with_filter_opt(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + pub fn with_sorted(mut self, sorted: bool) -> Self { + self.is_sorted = sorted; + self + } +} + +/// A range of keys. +/// +/// # Example +/// ```text +/// // key > 1 +/// KeyRange { +/// start: Bound::Excluded(DataValue::Int64(Some(1))), +/// end: Bound::Unbounded, +/// } +/// +/// // key = 0 +/// KeyRange { +/// start: Bound::Included(DataValue::Int64(Some(0))), +/// end: Bound::Included(DataValue::Int64(Some(0))), +/// } +/// ``` +#[derive(Debug, Clone, Serialize)] +pub struct KeyRange { + /// Start bound. + pub start: Bound, + /// End bound. + pub end: Bound, +} + +impl RangeBounds for KeyRange { + fn start_bound(&self) -> Bound<&DataValue> { + match &self.start { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(v) => Bound::Included(v), + Bound::Excluded(v) => Bound::Excluded(v), + } + } + + fn end_bound(&self) -> Bound<&DataValue> { + match &self.end { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(v) => Bound::Included(v), + Bound::Excluded(v) => Bound::Excluded(v), + } + } +} + /// An iterator over table in a transaction. pub trait TxnIterator: Send { /// get next batch of elements diff --git a/src/storage/secondary/compactor.rs b/src/storage/secondary/compactor.rs index 28adc0da..18842838 100644 --- a/src/storage/secondary/compactor.rs +++ b/src/storage/secondary/compactor.rs @@ -77,14 +77,7 @@ impl Compactor { iters.push( rowset - .iter( - column_refs.clone(), - dvs, - ColumnSeekPosition::start(), - None, - &[], - &[], - ) + .iter(column_refs.clone(), dvs, ColumnSeekPosition::start(), None) .await?, ); } diff --git a/src/storage/secondary/options.rs b/src/storage/secondary/options.rs index 8dd3638d..101e4589 100644 --- a/src/storage/secondary/options.rs +++ b/src/storage/secondary/options.rs @@ -83,7 +83,8 @@ impl StorageOptions { }, checksum_type: ChecksumType::Crc32, encode_type: EncodeType::Plain, - record_first_key: false, + // required by range-filter scan rule + record_first_key: true, disable_all_disk_operation: false, } } @@ -97,7 +98,8 @@ impl StorageOptions { io_backend: IOBackend::in_memory(), checksum_type: ChecksumType::None, encode_type: EncodeType::Plain, - record_first_key: false, + // required by range-filter scan rule + record_first_key: true, disable_all_disk_operation: true, } } diff --git a/src/storage/secondary/rowset/disk_rowset.rs b/src/storage/secondary/rowset/disk_rowset.rs index 78180c36..e3d11c0e 100644 --- a/src/storage/secondary/rowset/disk_rowset.rs +++ b/src/storage/secondary/rowset/disk_rowset.rs @@ -1,6 +1,5 @@ // Copyright 2023 RisingLight Project Authors. Licensed under Apache-2.0. -use std::borrow::Borrow; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -16,9 +15,8 @@ use crate::catalog::ColumnCatalog; use crate::storage::secondary::column::ColumnReadableFile; use crate::storage::secondary::encode::PrimitiveFixedWidthEncode; use crate::storage::secondary::DeleteVector; -use crate::storage::{StorageColumnRef, StorageResult}; +use crate::storage::{KeyRange, StorageColumnRef, StorageResult}; use crate::types::DataValue; -use crate::v1::binder::BoundExpr; /// Represents a column in Secondary. /// @@ -126,20 +124,9 @@ impl DiskRowset { column_refs: Arc<[StorageColumnRef]>, dvs: Vec>, seek_pos: ColumnSeekPosition, - expr: Option, - begin_keys: &[DataValue], - end_keys: &[DataValue], + filter: Option, ) -> StorageResult { - RowSetIterator::new( - self.clone(), - column_refs, - dvs, - seek_pos, - expr, - begin_keys, - end_keys, - ) - .await + RowSetIterator::new(self.clone(), column_refs, dvs, seek_pos, filter).await } pub fn on_disk_size(&self) -> u64 { @@ -159,14 +146,12 @@ impl DiskRowset { /// If `begin_key` is greater than all blocks' `first_key`, we return the `first_key` of the /// last block. /// Todo: support multi sort-keys range filter - pub async fn start_rowid(&self, begin_keys: &[DataValue]) -> ColumnSeekPosition { - if begin_keys.is_empty() { + pub async fn start_rowid(&self, begin_key: Option<&DataValue>) -> ColumnSeekPosition { + let Some(begin_key) = begin_key else { return ColumnSeekPosition::RowId(0); - } + }; - // for now, we only use the first column to get the start row id, which means the length - // of `begin_keys` can only be 0 or 1. - let begin_key = begin_keys[0].borrow(); + // for now, we only use the first column to get the start row id let column = self.column(0); let column_index = column.index(); @@ -193,8 +178,6 @@ impl DiskRowset { #[cfg(test)] pub mod tests { - use std::borrow::Borrow; - use tempfile::TempDir; use super::*; @@ -429,18 +412,16 @@ pub mod tests { async fn test_get_start_id() { let tempdir = tempfile::tempdir().unwrap(); let rowset = helper_build_rowset_with_first_key_recorded(&tempdir).await; - let start_keys = vec![DataValue::Int32(222)]; { - let start_rid = match rowset.start_rowid(start_keys.borrow()).await { + let start_rid = match rowset.start_rowid(Some(&DataValue::Int32(222))).await { ColumnSeekPosition::RowId(x) => x, _ => panic!("Unable to reach the branch"), }; assert_eq!(start_rid, 196_u32); } { - let start_keys = vec![DataValue::Int32(10000)]; - let start_rid = match rowset.start_rowid(start_keys.borrow()).await { + let start_rid = match rowset.start_rowid(Some(&DataValue::Int32(10000))).await { ColumnSeekPosition::RowId(x) => x, _ => panic!("Unable to reach the branch"), }; diff --git a/src/storage/secondary/rowset/rowset_iterator.rs b/src/storage/secondary/rowset/rowset_iterator.rs index 953e5f5b..f1e46f68 100644 --- a/src/storage/secondary/rowset/rowset_iterator.rs +++ b/src/storage/secondary/rowset/rowset_iterator.rs @@ -1,5 +1,6 @@ // Copyright 2023 RisingLight Project Authors. Licensed under Apache-2.0. +use std::ops::Bound; use std::sync::Arc; use bitvec::prelude::BitVec; @@ -7,11 +8,9 @@ use smallvec::smallvec; use super::super::{ColumnIteratorImpl, ColumnSeekPosition, SecondaryIteratorImpl}; use super::DiskRowset; -use crate::array::{Array, ArrayImpl}; +use crate::array::ArrayImpl; use crate::storage::secondary::DeleteVector; -use crate::storage::{PackedVec, StorageChunk, StorageColumnRef, StorageResult}; -use crate::types::DataValue; -use crate::v1::binder::BoundExpr; +use crate::storage::{KeyRange, PackedVec, StorageChunk, StorageColumnRef, StorageResult}; /// When `expected_size` is not specified, we should limit the maximum size of the chunk. const ROWSET_MAX_OUTPUT: usize = 2048; @@ -21,21 +20,19 @@ pub struct RowSetIterator { column_refs: Arc<[StorageColumnRef]>, dvs: Vec>, column_iterators: Vec, - filter_expr: Option<(BoundExpr, BitVec)>, - start_keys: Vec, - end_keys: Vec, - meet_start_key_before: bool, - meet_end_key_before: bool, // Indicate whether we have met `end_keys` in pre batch. + /// An optional filter for the first column. + filter: Option, + /// Indicate whether the iterator has reached the end. + end: bool, } + impl RowSetIterator { pub async fn new( rowset: Arc, column_refs: Arc<[StorageColumnRef]>, dvs: Vec>, seek_pos: ColumnSeekPosition, - expr: Option, - start_keys: &[DataValue], - end_keys: &[DataValue], + filter: Option, ) -> StorageResult { let start_row_id = match seek_pos { ColumnSeekPosition::RowId(row_id) => row_id, @@ -84,68 +81,66 @@ impl RowSetIterator { }; } - let filter_expr = if let Some(expr) = expr { - let filter_column = expr.get_filter_column(column_refs.len()); - // assert filter column is not all false - assert!( - filter_column.any(), - "There should be at least 1 filter column" - ); - Some((expr, filter_column)) - } else { - None - }; - Ok(Self { column_refs, dvs, column_iterators, - filter_expr, - start_keys: start_keys.to_vec(), - end_keys: end_keys.to_vec(), - meet_end_key_before: false, - meet_start_key_before: false, + filter, + end: false, }) } - pub async fn next_batch_inner( + /// Reads the next batch. + pub async fn next_batch( &mut self, expected_size: Option, - ) -> StorageResult<(bool, Option)> { - // We have met end key in pre `StorageChunk` - // so we can finish cur scan. - if self.meet_end_key_before { - return Ok((true, None)); + ) -> StorageResult> { + while !self.end { + if let Some(batch) = self.next_batch_inner(expected_size).await? { + return Ok(Some(batch)); + } + } + Ok(None) + } + + /// Reads the next batch. This function may return `None` if the next batch is empty, but it + /// doesn't mean that the iterator has reached the end. + async fn next_batch_inner( + &mut self, + expected_size: Option, + ) -> StorageResult> { + if self.end { + return Ok(None); } - let filter_context = self.filter_expr.as_ref(); // It's guaranteed that `expected_size` <= the number of items left // in the current block, if provided let mut fetch_size = { // We find the minimum fetch hints from the column iterators first - let mut min = None; + let mut min: Option = None; let mut is_finished = true; for it in &self.column_iterators { let (hint, finished) = it.fetch_hint(); if !finished { - is_finished = false + is_finished = false; } if hint != 0 { - if min.is_none() { - min = Some(hint); + if let Some(v) = min { + min = Some(v.min(hint)); } else { - min = Some(min.unwrap().min(hint)); + min = Some(hint); } } } - if min.is_some() { - min.unwrap().min(ROWSET_MAX_OUTPUT) + if let Some(min) = min { + min.min(ROWSET_MAX_OUTPUT) } else { // Fast return: when all columns size is `0`, only has tow case: // 1. index of current block is no data can fetch (use `ROWSET_MAX_OUTPUT`). // 2. all columns is finished (return directly). if is_finished { - return Ok((true, None)); + self.end = true; + return Ok(None); } ROWSET_MAX_OUTPUT } @@ -156,9 +151,6 @@ impl RowSetIterator { fetch_size = if x > fetch_size { fetch_size } else { x } } - let mut arrays: PackedVec> = smallvec![]; - let mut common_chunk_range = None; - // TODO: parallel fetch // TODO: align unmatched rows @@ -189,212 +181,84 @@ impl RowSetIterator { for (id, _) in self.column_refs.iter().enumerate() { self.column_iterators[id].skip(visi.len()); } - return Ok((false, None)); + return Ok(None); } // Switch visibility_map visibility_map = Some(visi); } - // Here, we scan the columns in filter condition if needed, if there are no - // filter conditions, we don't do any modification to the `visibility_map`, - // otherwise we apply the filtered result to it and get a new visibility map - if let Some((expr, filter_columns)) = filter_context { - for id in 0..filter_columns.len() { - if filter_columns[id] { - if let Some((row_id, array)) = self.column_iterators[id] - .next_batch(Some(fetch_size)) - .await? - { - if let Some(x) = common_chunk_range { - if x != (row_id, array.len()) { - panic!("unmatched rowid from column iterator"); - } - } - common_chunk_range = Some((row_id, array.len())); - arrays.push(Some(array)); - } else { - arrays.push(None); - } - } else { - arrays.push(None); - } - } - - // This check is necessary - let common_chunk_range = if let Some(common_chunk_range) = common_chunk_range { - common_chunk_range - } else { - return Ok((true, None)); - }; - - // Need to optimize - let bool_array = match expr - .eval_array_in_storage(&arrays, common_chunk_range.1) - .unwrap() - { - ArrayImpl::Bool(a) => a, - _ => panic!("filters can only accept bool array"), - }; - - let mut filter_bitmap = BitVec::with_capacity(bool_array.len()); - for (idx, e) in bool_array.iter().enumerate() { - if let Some(visi) = visibility_map.as_ref() { - if !visi[idx] { - filter_bitmap.push(false); - continue; - } - } - if let Some(e) = e { - filter_bitmap.push(*e); - } else { - filter_bitmap.push(false); - } - } + let mut arrays: PackedVec = smallvec![]; + // to make sure all columns have the same chunk range + let mut common_chunk_range = None; - // No rows left from the filter scan, skip columns which are not - // in filter conditions - if filter_bitmap.not_any() { - for (id, _) in self.column_refs.iter().enumerate() { - if !filter_columns[id] { - self.column_iterators[id].skip(filter_bitmap.len()); - } - } - return Ok((false, None)); - } - visibility_map = Some(filter_bitmap); - } - // whether we have meet end key in cur scan. - let mut meet_end_key = false; // At this stage, we know that some rows survived from the filter scan if happend, so // just fetch the next batch for every other columns, and we have `visibility_map` to // indicate the visibility of its rows // TODO: Implement the skip interface for column_iterator and call it here. // For those already fetched columns, they also need to delete corrensponding blocks. for (id, _) in self.column_refs.iter().enumerate() { - if filter_context.is_none() { - // If no filter, the `arrays` should be initialized here - // manually by push a `None` - arrays.push(None); - } - if arrays[id].is_none() { - if let Some((row_id, array)) = self.column_iterators[id] - .next_batch(Some(fetch_size)) - .await? - { - if let Some(x) = common_chunk_range { - let current_data = (row_id, array.len()); - if x != current_data { - panic!( - "unmatched rowid from column iterator: {:?} of [{:?}], {:?} != {:?}", - self.column_refs[id], self.column_refs, x, current_data - ); - } - } - common_chunk_range = Some((row_id, array.len())); - arrays[id] = Some(array); + let Some((row_id, array)) = self.column_iterators[id] + .next_batch(Some(fetch_size)) + .await? + else { + self.end = true; + return Ok(None); + }; + + // check chunk range + let current_range = row_id..row_id + array.len() as u32; + if let Some(common_range) = &common_chunk_range { + if common_range != ¤t_range { + panic!( + "unmatched row range from column iterator: {:?} of [{:?}], {:?} != {:?}", + self.column_refs[id], self.column_refs, common_range, current_range + ); } + } else { + common_chunk_range = Some(current_range); } + // For now, we only support range-filter scan by first column. - if id == 0 { - if !self.start_keys.is_empty() && !self.meet_start_key_before { - // find the first row in range to begin with - self.meet_start_key_before = true; - let array = arrays[0].as_ref().unwrap(); - let len = array.len(); - let start_key = &self.start_keys[0]; - let start_row_id = - (0..len).position(|idx| start_key - &array.get(idx) <= DataValue::Int32(0)); - if start_row_id.is_none() { - // the `begin_key` is greater than all of the data, so on item survives in - // this scan - return Ok((true, None)); + if let Some(range) = &self.filter && id == 0 { + let len = array.len(); + let start_row_id = match &range.start { + Bound::Included(key) => { + (0..array.len()).position(|idx| &array.get(idx) >= key) + } + Bound::Excluded(key) => { + (0..array.len()).position(|idx| &array.get(idx) > key) } - let start_row_id = start_row_id.unwrap(); - let new_bitmap = - Self::mark_inaccessible(visibility_map.as_ref(), 0, start_row_id, len) - .await; - visibility_map = Some(new_bitmap); + Bound::Unbounded => Some(0), } - - if !self.end_keys.is_empty() && arrays[0].is_some() { - let array = arrays[0].as_ref().unwrap(); - let len = array.len(); - let end_key = &self.end_keys[0]; - if end_key - &array.get(len - 1) < DataValue::Int32(0) { - // this block's last key is greater than the `end_key`, - // so we will finish scan after scan this block - meet_end_key = true; - let end_row_id = (0..len) - .position(|idx| end_key - &array.get(idx) < DataValue::Int32(0)) - .unwrap(); - let new_bitmap = - Self::mark_inaccessible(visibility_map.as_ref(), end_row_id, len, len) - .await; - visibility_map = Some(new_bitmap); + .unwrap_or(len); + let end_row_id = match &range.end { + Bound::Included(key) => { + (0..array.len()).position(|idx| &array.get(idx) > key) + } + Bound::Excluded(key) => { + (0..array.len()).position(|idx| &array.get(idx) >= key) } + Bound::Unbounded => None, } - } - } - - if common_chunk_range.is_none() { - return Ok((true, None)); - }; - - Ok(( - meet_end_key, - StorageChunk::construct( - visibility_map, - arrays.into_iter().map(Option::unwrap).collect(), - ), - )) - } - - pub async fn next_batch( - &mut self, - expected_size: Option, - ) -> StorageResult> { - loop { - let (finished, batch) = self.next_batch_inner(expected_size).await?; - if finished { - if batch.is_some() { - // we have met end key in cur batch, so we just return those data in the range. - self.meet_end_key_before = true; - return Ok(batch); + .unwrap_or(len); + if (start_row_id..end_row_id) != (0..len) { + let bitmap = (0..len).map(|i| (start_row_id..end_row_id).contains(&i)).collect(); + if let Some(ref mut vis) = visibility_map { + *vis &= bitmap; + } else { + visibility_map = Some(bitmap); + } } - return Ok(None); - } else if let Some(batch) = batch { - return Ok(Some(batch)); - } - } - } - - /// mark all positions between `start_id`(include) and `end_id`(not include) false in a new - /// `BitVec`, the len of this `BitVec` is `len`, and if a position is marked false in - /// `bitmap`, we just keep in false in the new `Bitvec` - pub async fn mark_inaccessible( - bitmap: Option<&BitVec>, - start_id: usize, - end_id: usize, - len: usize, - ) -> BitVec { - let mut filter_bitmap = BitVec::with_capacity(len); - for idx in 0..len { - if let Some(visi) = bitmap { - if !visi[idx] { - // Cur row was previously marked inaccessible, - // so we'll just keep it. - filter_bitmap.push(false); - continue; + if end_row_id == 0 { + self.end = true; } } - if idx < end_id && idx >= start_id { - filter_bitmap.push(false); - } else { - filter_bitmap.push(true); - } + + arrays.push(array); } - filter_bitmap + + Ok(StorageChunk::construct(visibility_map, arrays)) } } @@ -402,8 +266,9 @@ impl SecondaryIteratorImpl for RowSetIterator {} #[cfg(test)] mod tests { + use std::ops::Bound; + use itertools::Itertools; - use sqlparser::ast::BinaryOperator; use super::*; use crate::array::{Array, ArrayToVecExt}; @@ -411,8 +276,7 @@ mod tests { helper_build_rowset, helper_build_rowset_with_first_key_recorded, }; use crate::storage::secondary::SecondaryRowHandler; - use crate::types::{DataTypeKind, DataValue}; - use crate::v1::binder::{BoundBinaryOp, BoundInputRef}; + use crate::types::DataValue; #[tokio::test] async fn test_rowset_iterator() { @@ -429,8 +293,6 @@ mod tests { vec![], ColumnSeekPosition::RowId(1000), None, - &[], - &[], ) .await .unwrap(); @@ -445,7 +307,6 @@ mod tests { .take(20) .map(Some) .collect_vec(); - assert_eq!(left.len(), right.len()); assert_eq!(left, right); } else { unreachable!() @@ -460,7 +321,6 @@ mod tests { .take(20) .map(Some) .collect_vec(); - assert_eq!(left.len(), right.len()); assert_eq!(left, right); } else { unreachable!() @@ -480,21 +340,6 @@ mod tests { // v3 > 4: it.next_batch will return none, because StorageChunk::construct always return // none. v3 > 2: all blocks will be fetched. - let op = BinaryOperator::Gt; - - let left_expr = Box::new(BoundExpr::InputRef(BoundInputRef { - index: 2, - return_type: DataTypeKind::Int32.nullable(), - })); - - let right_expr = Box::new(BoundExpr::Constant(DataValue::Int32(2))); - - let expr = BoundExpr::BinaryOp(BoundBinaryOp { - op, - left_expr, - right_expr, - return_type: DataTypeKind::Bool.nullable(), - }); let mut it = rowset .iter( vec![ @@ -505,9 +350,10 @@ mod tests { .into(), vec![], ColumnSeekPosition::RowId(1000), - Some(expr), - &[], - &[], + Some(KeyRange { + start: Bound::Excluded(DataValue::Int32(2)), + end: Bound::Unbounded, + }), ) .await .unwrap(); @@ -521,7 +367,6 @@ mod tests { .take(20) .map(Some) .collect_vec(); - assert_eq!(left.len(), right.len()); assert_eq!(left, right); } else { unreachable!() @@ -536,7 +381,6 @@ mod tests { .take(20) .map(Some) .collect_vec(); - assert_eq!(left.len(), right.len()); assert_eq!(left, right); } else { unreachable!() @@ -554,8 +398,6 @@ mod tests { { let tempdir = tempfile::tempdir().unwrap(); let rowset = Arc::new(helper_build_rowset_with_first_key_recorded(&tempdir).await); - let start_keys = vec![DataValue::Int32(180)]; - let end_keys = vec![DataValue::Int32(195)]; let mut it = rowset .iter( vec![ @@ -566,9 +408,10 @@ mod tests { .into(), vec![], ColumnSeekPosition::RowId(168), - None, - &start_keys, - &end_keys, + Some(KeyRange { + start: Bound::Included(DataValue::Int32(180)), + end: Bound::Included(DataValue::Int32(195)), + }), ) .await .unwrap(); @@ -611,8 +454,6 @@ mod tests { vec![], ColumnSeekPosition::RowId(0), None, - &[], - &[], ) .await .unwrap(); @@ -644,7 +485,6 @@ mod tests { // test only setting `start_keys` let tempdir = tempfile::tempdir().unwrap(); let rowset = Arc::new(helper_build_rowset_with_first_key_recorded(&tempdir).await); - let start_keys = vec![DataValue::Int32(180)]; let mut it = rowset .iter( vec![ @@ -655,9 +495,10 @@ mod tests { .into(), vec![], ColumnSeekPosition::RowId(168), - None, - &start_keys, - &[], + Some(KeyRange { + start: Bound::Included(DataValue::Int32(180)), + end: Bound::Unbounded, + }), ) .await .unwrap(); @@ -689,7 +530,6 @@ mod tests { // test only set `start_keys` but no data satisfied. let tempdir = tempfile::tempdir().unwrap(); let rowset = Arc::new(helper_build_rowset_with_first_key_recorded(&tempdir).await); - let start_keys = vec![DataValue::Int32(1800)]; let mut it = rowset .iter( vec![ @@ -700,9 +540,10 @@ mod tests { .into(), vec![], ColumnSeekPosition::RowId(252), - None, - &start_keys, - &[], + Some(KeyRange { + start: Bound::Included(DataValue::Int32(1800)), + end: Bound::Unbounded, + }), ) .await .unwrap(); @@ -734,7 +575,6 @@ mod tests { // test only set `end_keys let tempdir = tempfile::tempdir().unwrap(); let rowset = Arc::new(helper_build_rowset_with_first_key_recorded(&tempdir).await); - let end_keys = vec![DataValue::Int32(195)]; let mut it = rowset .iter( vec![ @@ -745,9 +585,10 @@ mod tests { .into(), vec![], ColumnSeekPosition::RowId(0), - None, - &[], - &end_keys, + Some(KeyRange { + start: Bound::Unbounded, + end: Bound::Included(DataValue::Int32(195)), + }), ) .await .unwrap(); @@ -779,7 +620,6 @@ mod tests { // test only set `end_keys` but all data satisfied. let tempdir = tempfile::tempdir().unwrap(); let rowset = Arc::new(helper_build_rowset_with_first_key_recorded(&tempdir).await); - let end_keys = vec![DataValue::Int32(19500)]; let mut it = rowset .iter( vec![ @@ -790,9 +630,10 @@ mod tests { .into(), vec![], ColumnSeekPosition::RowId(0), - None, - &[], - &end_keys, + Some(KeyRange { + start: Bound::Unbounded, + end: Bound::Included(DataValue::Int32(19500)), + }), ) .await .unwrap(); diff --git a/src/storage/secondary/transaction.rs b/src/storage/secondary/transaction.rs index 0a70094e..bb74907c 100644 --- a/src/storage/secondary/transaction.rs +++ b/src/storage/secondary/transaction.rs @@ -1,6 +1,7 @@ // Copyright 2023 RisingLight Project Authors. Licensed under Apache-2.0. use std::collections::HashMap; +use std::ops::Bound; use std::sync::Arc; use itertools::Itertools; @@ -18,9 +19,8 @@ use super::{ use crate::array::DataChunk; use crate::catalog::find_sort_key_id; use crate::storage::secondary::statistics::create_statistics_global_aggregator; -use crate::storage::{StorageColumnRef, StorageResult, Transaction}; +use crate::storage::{ScanOptions, StorageColumnRef, StorageResult, Transaction}; use crate::types::DataValue; -use crate::v1::binder::BoundExpr; /// A transaction running on `SecondaryStorage`. pub struct SecondaryTransaction { @@ -218,14 +218,10 @@ impl SecondaryTransaction { async fn scan_inner( &self, - begin_keys: &[DataValue], - end_keys: &[DataValue], col_idx: &[StorageColumnRef], - is_sorted: bool, - reversed: bool, - expr: Option, + opts: ScanOptions, ) -> StorageResult { - assert!(!reversed, "reverse iterator is not supported for now"); + assert!(!opts.reversed, "reverse iterator is not supported for now"); let mut iters: Vec = vec![]; @@ -244,17 +240,17 @@ impl SecondaryTransaction { }) .unwrap_or_default(); + let begin_keys = match &opts.filter { + Some(range) => match &range.start { + Bound::Included(k) | Bound::Excluded(k) => Some(k), + _ => None, + }, + _ => None, + }; let start_rowid = rowset.start_rowid(begin_keys).await; iters.push( rowset - .iter( - col_idx.into(), - dvs, - start_rowid, - expr.clone(), - begin_keys, - end_keys, - ) + .iter(col_idx.into(), dvs, start_rowid, opts.filter.clone()) .await?, ) } @@ -262,7 +258,7 @@ impl SecondaryTransaction { let final_iter = if iters.len() == 1 { iters.pop().unwrap().into() - } else if is_sorted { + } else if opts.is_sorted { let sort_key = find_sort_key_id(&self.table.columns); if let Some(sort_key) = sort_key { let real_col_idx = col_idx.iter().position(|x| match x { @@ -353,22 +349,10 @@ impl Transaction for SecondaryTransaction { async fn scan( &self, - begin_sort_key: &[DataValue], - end_sort_key: &[DataValue], col_idx: &[StorageColumnRef], - is_sorted: bool, - reversed: bool, - expr: Option, + options: ScanOptions, ) -> StorageResult { - self.scan_inner( - begin_sort_key, - end_sort_key, - col_idx, - is_sorted, - reversed, - expr, - ) - .await + self.scan_inner(col_idx, options).await } async fn append(&mut self, columns: DataChunk) -> StorageResult<()> { diff --git a/src/v1/binder/expression/mod.rs b/src/v1/binder/expression/mod.rs index 27649d9c..45cc5cf5 100644 --- a/src/v1/binder/expression/mod.rs +++ b/src/v1/binder/expression/mod.rs @@ -1,6 +1,5 @@ // Copyright 2023 RisingLight Project Authors. Licensed under Apache-2.0. -use bitvec::prelude::BitVec; use serde::Serialize; use super::*; @@ -58,22 +57,6 @@ impl BoundExpr { } } - fn get_filter_column_inner(&self, filter_column: &mut BitVec) { - struct Visitor<'a>(&'a mut BitVec); - impl<'a> ExprVisitor for Visitor<'a> { - fn visit_input_ref(&mut self, expr: &BoundInputRef) { - self.0.set(expr.index, true) - } - } - Visitor(filter_column).visit_expr(self); - } - - pub fn get_filter_column(&self, len: usize) -> BitVec { - let mut filter_column = BitVec::repeat(false, len); - self.get_filter_column_inner(&mut filter_column); - filter_column - } - pub fn contains_column_ref(&self) -> bool { struct Visitor(bool); impl ExprVisitor for Visitor { diff --git a/src/v1/executor/evaluator.rs b/src/v1/executor/evaluator.rs index 9a5751ab..6b5a17b4 100644 --- a/src/v1/executor/evaluator.rs +++ b/src/v1/executor/evaluator.rs @@ -3,7 +3,6 @@ //! Apply expressions on data chunks. use crate::array::*; -use crate::storage::PackedVec; use crate::types::{ConvertError, DataValue}; use crate::v1::binder::BoundExpr; @@ -49,52 +48,4 @@ impl BoundExpr { _ => panic!("{:?} should not be evaluated in `eval_array`", self), } } - - /// Evaluate the given expression as an array in storage engine. - pub fn eval_array_in_storage( - &self, - chunk: &PackedVec>, - cardinality: usize, - ) -> Result { - match &self { - BoundExpr::InputRef(input_ref) => Ok(chunk[input_ref.index].clone().unwrap()), - BoundExpr::BinaryOp(binary_op) => { - let left = binary_op - .left_expr - .eval_array_in_storage(chunk, cardinality)?; - let right = binary_op - .right_expr - .eval_array_in_storage(chunk, cardinality)?; - left.binary_op(&binary_op.op, &right) - } - BoundExpr::UnaryOp(op) => { - let array = op.expr.eval_array_in_storage(chunk, cardinality)?; - array.unary_op(&op.op) - } - BoundExpr::Constant(v) => { - let mut builder = ArrayBuilderImpl::with_capacity(cardinality, &self.return_type()); - // TODO: optimize this - for _ in 0..cardinality { - builder.push(v); - } - Ok(builder.finish()) - } - BoundExpr::TypeCast(cast) => { - let array = cast.expr.eval_array_in_storage(chunk, cardinality)?; - if self.return_type() == cast.expr.return_type() { - return Ok(array); - } - array.cast(&cast.ty) - } - BoundExpr::IsNull(expr) => { - let array = expr.expr.eval_array_in_storage(chunk, cardinality)?; - Ok(ArrayImpl::new_bool( - (0..array.len()) - .map(|i| array.get(i) == DataValue::Null) - .collect(), - )) - } - _ => panic!("{:?} should not be evaluated in `eval_array`", self), - } - } } diff --git a/src/v1/executor/mod.rs b/src/v1/executor/mod.rs index 79161088..6a675778 100644 --- a/src/v1/executor/mod.rs +++ b/src/v1/executor/mod.rs @@ -263,13 +263,11 @@ impl PlanVisitor for ExecutorBuilder { match &self.storage { StorageImpl::InMemoryStorage(storage) => TableScanExecutor { plan: plan.clone(), - expr: None, storage: storage.clone(), } .execute(), StorageImpl::SecondaryStorage(storage) => TableScanExecutor { plan: plan.clone(), - expr: plan.logical().expr().cloned(), storage: storage.clone(), } .execute(), diff --git a/src/v1/executor/table_scan.rs b/src/v1/executor/table_scan.rs index c9070a56..e5357e32 100644 --- a/src/v1/executor/table_scan.rs +++ b/src/v1/executor/table_scan.rs @@ -4,14 +4,12 @@ use std::sync::Arc; use super::*; use crate::array::{ArrayBuilder, ArrayBuilderImpl, DataChunk, I64ArrayBuilder}; -use crate::storage::{Storage, StorageColumnRef, Table, Transaction, TxnIterator}; -use crate::v1::binder::BoundExpr; +use crate::storage::{ScanOptions, Storage, StorageColumnRef, Table, Transaction, TxnIterator}; use crate::v1::optimizer::plan_nodes::PhysicalTableScan; /// The executor of table scan operation. pub struct TableScanExecutor { pub plan: PhysicalTableScan, - pub expr: Option, pub storage: Arc, } @@ -76,12 +74,10 @@ impl TableScanExecutor { let mut it = txn .scan( - &[], - &[], &col_idx, - self.plan.logical().is_sorted(), - false, - self.expr, + ScanOptions::default() + .with_filter_opt(self.plan.logical().filter().clone()) + .with_sorted(self.plan.logical().is_sorted()), ) .await?; diff --git a/src/v1/optimizer/mod.rs b/src/v1/optimizer/mod.rs index 3a582ee3..3a76fec8 100644 --- a/src/v1/optimizer/mod.rs +++ b/src/v1/optimizer/mod.rs @@ -38,14 +38,15 @@ impl Optimizer { plan = arith_expr_simplification_rule.rewrite(plan); plan = bool_expr_simplification_rule.rewrite(plan); plan = constant_moving_rule.rewrite(plan); - let mut rules: Vec> = vec![ + let rules: Vec> = vec![ Box::new(FilterAggRule {}), Box::new(FilterJoinRule {}), Box::new(LimitOrderRule {}), ]; - if self.enable_filter_scan { - rules.push(Box::new(FilterScanRule {})); - } + // the filter-scan rule is not complete yet + // if self.enable_filter_scan { + // rules.push(Box::new(FilterScanRule {})); + // } let hep_optimizer = HeuristicOptimizer { rules }; plan = hep_optimizer.optimize(plan); let out_types_num = plan.out_types().len(); diff --git a/src/v1/optimizer/plan_nodes/logical_table_scan.rs b/src/v1/optimizer/plan_nodes/logical_table_scan.rs index 17cbc611..424f8e62 100644 --- a/src/v1/optimizer/plan_nodes/logical_table_scan.rs +++ b/src/v1/optimizer/plan_nodes/logical_table_scan.rs @@ -8,9 +8,9 @@ use serde::Serialize; use super::*; use crate::catalog::{ColumnDesc, ColumnId, TableRefId}; +use crate::storage::KeyRange; use crate::types::DataTypeKind; -use crate::v1::binder::ExprVisitor; -use crate::v1::optimizer::logical_plan_rewriter::ExprRewriter; + /// The logical plan of sequential scan operation. #[derive(Debug, Clone, Serialize)] pub struct LogicalTableScan { @@ -19,7 +19,7 @@ pub struct LogicalTableScan { column_descs: Vec, with_row_handler: bool, is_sorted: bool, - expr: Option, + filter: Option, } impl LogicalTableScan { @@ -29,7 +29,7 @@ impl LogicalTableScan { column_descs: Vec, with_row_handler: bool, is_sorted: bool, - expr: Option, + filter: Option, ) -> Self { Self { table_ref_id, @@ -37,7 +37,7 @@ impl LogicalTableScan { column_descs, with_row_handler, is_sorted, - expr, + filter, } } @@ -67,8 +67,8 @@ impl LogicalTableScan { } /// Get a reference to the logical table scan's expr. - pub fn expr(&self) -> Option<&BoundExpr> { - self.expr.as_ref() + pub fn filter(&self) -> &Option { + &self.filter } } impl PlanTreeNodeLeaf for LogicalTableScan {} @@ -93,11 +93,15 @@ impl PlanNode for LogicalTableScan { } fn prune_col(&self, required_cols: BitSet) -> PlanRef { - let mut visitor = CollectRequiredCols(BitSet::new()); - if self.expr.is_some() { - visitor.visit_expr(self.expr.as_ref().unwrap()); + let mut filter_cols = BitSet::new(); + if self.filter.is_some() { + // keep the primary key + for (idx, col) in self.column_descs.iter().enumerate() { + if col.is_primary() { + filter_cols.insert(idx); + } + } } - let filter_cols = visitor.0; let mut need_rewrite = false; @@ -110,7 +114,7 @@ impl PlanNode for LogicalTableScan { } let mut idx_table = HashMap::new(); - let (mut column_ids, mut column_descs): (Vec<_>, Vec<_>) = required_cols + let (column_ids, column_descs): (Vec<_>, Vec<_>) = required_cols .iter() .filter(|&id| id < self.column_ids.len()) .map(|id| { @@ -119,32 +123,13 @@ impl PlanNode for LogicalTableScan { }) .unzip(); - let mut offset = column_ids.len(); - let mut expr = self.expr.clone(); - if need_rewrite { - let (f_column_ids, f_column_descs): (Vec<_>, Vec<_>) = filter_cols - .iter() - .filter(|&id| id < self.column_ids.len() && !required_cols.contains(id)) - .map(|id| { - idx_table.insert(id, offset); - offset += 1; - (self.column_ids[id], self.column_descs[id].clone()) - }) - .unzip(); - - column_ids.extend(f_column_ids.into_iter()); - column_descs.extend(f_column_descs.into_iter()); - - Mapper(idx_table).rewrite_expr(expr.as_mut().unwrap()); - } - let new_scan = Self { table_ref_id: self.table_ref_id, column_ids, column_descs: column_descs.clone(), with_row_handler: self.with_row_handler, is_sorted: self.is_sorted, - expr, + filter: self.filter.clone(), } .into_plan_ref(); @@ -167,12 +152,12 @@ impl fmt::Display for LogicalTableScan { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!( f, - "LogicalTableScan: table #{}, columns [{}], with_row_handler: {}, is_sorted: {}, expr: {}", + "LogicalTableScan: table #{}, columns [{}], with_row_handler: {}, is_sorted: {}, filter: {}", self.table_ref_id.table_id, self.column_ids.iter().map(ToString::to_string).join(", "), self.with_row_handler, self.is_sorted, - self.expr.clone().map_or_else(|| "None".to_string(), |expr| format!("{:?}", expr)) + self.filter.clone().map_or_else(|| "None".to_string(), |expr| format!("{:?}", expr)) ) } } diff --git a/src/v1/optimizer/plan_nodes/physical_table_scan.rs b/src/v1/optimizer/plan_nodes/physical_table_scan.rs index cffbe675..60677fe3 100644 --- a/src/v1/optimizer/plan_nodes/physical_table_scan.rs +++ b/src/v1/optimizer/plan_nodes/physical_table_scan.rs @@ -47,7 +47,7 @@ impl fmt::Display for PhysicalTableScan { columns [{}], with_row_handler: {}, is_sorted: {}, - expr: {}"}, + filter: {}"}, self.logical().table_ref_id().table_id, self.logical() .column_ids() @@ -57,7 +57,8 @@ impl fmt::Display for PhysicalTableScan { self.logical().with_row_handler(), self.logical().is_sorted(), self.logical() - .expr() + .filter() + .as_ref() .map_or_else(|| "None".to_string(), |expr| format!("{:?}", expr)) ) } diff --git a/src/v1/optimizer/rules/filter_scan_rule.rs b/src/v1/optimizer/rules/filter_scan_rule.rs index 1552e8af..0c525283 100644 --- a/src/v1/optimizer/rules/filter_scan_rule.rs +++ b/src/v1/optimizer/rules/filter_scan_rule.rs @@ -12,13 +12,14 @@ impl Rule for FilterScanRule { let filter = plan.as_logical_filter()?; let child = filter.child(); let scan = child.as_logical_table_scan()?.clone(); + #[allow(unreachable_code)] Ok(Arc::new(LogicalTableScan::new( scan.table_ref_id(), scan.column_ids().to_vec(), scan.column_descs().to_vec(), scan.with_row_handler(), scan.is_sorted(), - Some(filter.expr().clone()), + todo!("extract range filter"), ))) } } diff --git a/tests/planner_test/count.planner.sql b/tests/planner_test/count.planner.sql index 2323a399..d6a100b3 100644 --- a/tests/planner_test/count.planner.sql +++ b/tests/planner_test/count.planner.sql @@ -4,7 +4,7 @@ explain select count(*) from t /* Projection { exprs: [ rowcount ], cost: 302.3 } └── Aggregate { aggs: [ rowcount ], group_by: [], cost: 301 } - └── Scan { table: t, list: [], cost: 0 } + └── Scan { table: t, list: [], filter: null, cost: 0 } */ -- count(*) with projection @@ -13,6 +13,6 @@ explain select count(*) + 1 from t /* Projection { exprs: [ + { lhs: 1, rhs: rowcount } ], cost: 302.5 } └── Aggregate { aggs: [ rowcount ], group_by: [], cost: 301 } - └── Scan { table: t, list: [], cost: 0 } + └── Scan { table: t, list: [], filter: null, cost: 0 } */ diff --git a/tests/planner_test/extract-common-predicate.planner.sql b/tests/planner_test/extract-common-predicate.planner.sql index 1f06cb91..e9cbe09b 100644 --- a/tests/planner_test/extract-common-predicate.planner.sql +++ b/tests/planner_test/extract-common-predicate.planner.sql @@ -5,6 +5,6 @@ explain select * from t where (a = 1 and b = 2) or (a = 1 and c = 3) Filter ├── cond: and { lhs: = { lhs: a, rhs: 1 }, rhs: or { lhs: = { lhs: c, rhs: 3 }, rhs: = { lhs: b, rhs: 2 } } } ├── cost: 4316 -└── Scan { table: t, list: [ a, b, c ], cost: 3000 } +└── Scan { table: t, list: [ a, b, c ], filter: null, cost: 3000 } */ diff --git a/tests/planner_test/tpch.planner.sql b/tests/planner_test/tpch.planner.sql index cb1616f0..2c23aace 100644 --- a/tests/planner_test/tpch.planner.sql +++ b/tests/planner_test/tpch.planner.sql @@ -181,6 +181,7 @@ Projection └── Scan ├── table: lineitem ├── list: [ l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate ] + ├── filter: null └── cost: 7000 */ @@ -241,17 +242,23 @@ Projection │ └── HashJoin { type: inner, on: = { lhs: [ c_custkey ], rhs: [ o_custkey ] }, cost: 22091.05 } │ ├── Projection { exprs: [ c_custkey ], cost: 2940 } │ │ └── Filter { cond: = { lhs: c_mktsegment, rhs: 'BUILDING' }, cost: 2700 } - │ │ └── Scan { table: customer, list: [ c_custkey, c_mktsegment ], cost: 2000 } + │ │ └── Scan + │ │ ├── table: customer + │ │ ├── list: [ c_custkey, c_mktsegment ] + │ │ ├── filter: null + │ │ └── cost: 2000 │ └── Filter { cond: > { lhs: 1995-03-15, rhs: o_orderdate }, cost: 7500 } │ └── Scan │ ├── table: orders │ ├── list: [ o_orderkey, o_custkey, o_orderdate, o_shippriority ] + │ ├── filter: null │ └── cost: 4000 └── Projection { exprs: [ l_orderkey, l_extendedprice, l_discount ], cost: 10220 } └── Filter { cond: > { lhs: l_shipdate, rhs: 1995-03-15 }, cost: 7500 } └── Scan ├── table: lineitem ├── list: [ l_orderkey, l_extendedprice, l_discount, l_shipdate ] + ├── filter: null └── cost: 4000 */ @@ -322,6 +329,7 @@ Projection │ │ │ │ ├── Scan │ │ │ │ │ ├── table: customer │ │ │ │ │ ├── list: [ c_custkey, c_nationkey ] + │ │ │ │ │ ├── filter: null │ │ │ │ │ └── cost: 2000 │ │ │ │ └── Projection { exprs: [ o_orderkey, o_custkey ], cost: 7092 } │ │ │ │ └── Filter @@ -332,16 +340,18 @@ Projection │ │ │ │ └── Scan │ │ │ │ ├── table: orders │ │ │ │ ├── list: [ o_orderkey, o_custkey, o_orderdate ] + │ │ │ │ ├── filter: null │ │ │ │ └── cost: 3000 │ │ │ └── Scan │ │ │ ├── table: lineitem │ │ │ ├── list: [ l_orderkey, l_suppkey, l_extendedprice, l_discount ] + │ │ │ ├── filter: null │ │ │ └── cost: 4000 - │ │ └── Scan { table: supplier, list: [ s_suppkey, s_nationkey ], cost: 2000 } - │ └── Scan { table: nation, list: [ n_nationkey, n_name, n_regionkey ], cost: 3000 } + │ │ └── Scan { table: supplier, list: [ s_suppkey, s_nationkey ], filter: null, cost: 2000 } + │ └── Scan { table: nation, list: [ n_nationkey, n_name, n_regionkey ], filter: null, cost: 3000 } └── Projection { exprs: [ r_regionkey ], cost: 2940 } └── Filter { cond: = { lhs: r_name, rhs: 'AFRICA' }, cost: 2700 } - └── Scan { table: region, list: [ r_regionkey, r_name ], cost: 2000 } + └── Scan { table: region, list: [ r_regionkey, r_name ], filter: null, cost: 2000 } */ -- tpch-q6 @@ -377,7 +387,142 @@ Projection │ ├── lhs: > { lhs: 1995-01-01, rhs: l_shipdate } │ └── rhs: >= { lhs: l_shipdate, rhs: 1994-01-01 } ├── cost: 7210.72 - └── Scan { table: lineitem, list: [ l_quantity, l_extendedprice, l_discount, l_shipdate ], cost: 4000 } + └── Scan + ├── table: lineitem + ├── list: [ l_quantity, l_extendedprice, l_discount, l_shipdate ] + ├── filter: null + └── cost: 4000 +*/ + +-- tpch-q9 +explain select + nation, + o_year, + sum(amount) as sum_profit +from + ( + select + n_name as nation, + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%green%' + ) as profit +group by + nation, + o_year +order by + nation, + o_year desc; + +/* +Projection +├── exprs: +│ ┌── n_name +│ ├── Extract { from: o_orderdate, field: YEAR } +│ └── sum +│ └── - +│ ├── lhs: * { lhs: l_extendedprice, rhs: - { lhs: 1, rhs: l_discount } } +│ └── rhs: * { lhs: ps_supplycost, rhs: l_quantity } +├── cost: 160442300 +└── Order + ├── by: + │ ┌── asc + │ │ └── n_name + │ └── desc + │ └── Extract { from: o_orderdate, field: YEAR } + ├── cost: 157892300 + └── Aggregate + ├── aggs:sum + │ └── - + │ ├── lhs: * { lhs: l_extendedprice, rhs: - { lhs: 1, rhs: l_discount } } + │ └── rhs: * { lhs: ps_supplycost, rhs: l_quantity } + ├── group_by: [ n_name, Extract { from: o_orderdate, field: YEAR } ] + ├── cost: 146926510 + └── Projection + ├── exprs: + │ ┌── n_name + │ ├── Extract { from: o_orderdate, field: YEAR } + │ └── - + │ ├── lhs: * { lhs: l_extendedprice, rhs: - { lhs: 1, rhs: l_discount } } + │ └── rhs: * { lhs: ps_supplycost, rhs: l_quantity } + ├── cost: 143526510 + └── HashJoin { type: inner, on: = { lhs: [ s_nationkey ], rhs: [ n_nationkey ] }, cost: 139126510 } + ├── Projection + │ ├── exprs: [ s_nationkey, ps_supplycost, o_orderdate, l_quantity, l_extendedprice, l_discount ] + │ ├── cost: 111173010 + │ └── HashJoin { type: inner, on: = { lhs: [ l_orderkey ], rhs: [ o_orderkey ] }, cost: 104473010 } + │ ├── Projection + │ │ ├── exprs: + │ │ │ ┌── s_nationkey + │ │ │ ├── ps_supplycost + │ │ │ ├── l_orderkey + │ │ │ ├── l_quantity + │ │ │ ├── l_extendedprice + │ │ │ └── l_discount + │ │ ├── cost: 76519500 + │ │ └── HashJoin + │ │ ├── type: inner + │ │ ├── on: = { lhs: [ l_suppkey, l_partkey ], rhs: [ ps_suppkey, ps_partkey ] } + │ │ ├── cost: 69819500 + │ │ ├── Projection + │ │ │ ├── exprs: + │ │ │ │ ┌── s_nationkey + │ │ │ │ ├── l_orderkey + │ │ │ │ ├── l_partkey + │ │ │ │ ├── l_suppkey + │ │ │ │ ├── l_quantity + │ │ │ │ ├── l_extendedprice + │ │ │ │ └── l_discount + │ │ │ ├── cost: 39865000 + │ │ │ └── HashJoin + │ │ │ ├── type: inner + │ │ │ ├── on: = { lhs: [ s_suppkey, p_partkey ], rhs: [ l_suppkey, l_partkey ] } + │ │ │ ├── cost: 32065002 + │ │ │ ├── Join { type: inner, cost: 3107500 } + │ │ │ │ ├── Projection { exprs: [ p_partkey ], cost: 5500 } + │ │ │ │ │ └── Filter { cond: like { lhs: p_name, rhs: '%green%' }, cost: 4300 } + │ │ │ │ │ └── Scan + │ │ │ │ │ ├── table: part + │ │ │ │ │ ├── list: [ p_partkey, p_name ] + │ │ │ │ │ ├── filter: null + │ │ │ │ │ └── cost: 2000 + │ │ │ │ └── Scan + │ │ │ │ ├── table: supplier + │ │ │ │ ├── list: [ s_suppkey, s_nationkey ] + │ │ │ │ ├── filter: null + │ │ │ │ └── cost: 2000 + │ │ │ └── Scan + │ │ │ ├── table: lineitem + │ │ │ ├── list: + │ │ │ │ ┌── l_orderkey + │ │ │ │ ├── l_partkey + │ │ │ │ ├── l_suppkey + │ │ │ │ ├── l_quantity + │ │ │ │ ├── l_extendedprice + │ │ │ │ └── l_discount + │ │ │ ├── filter: null + │ │ │ └── cost: 6000 + │ │ └── Scan + │ │ ├── table: partsupp + │ │ ├── list: [ ps_partkey, ps_suppkey, ps_supplycost ] + │ │ ├── filter: null + │ │ └── cost: 3000 + │ └── Scan { table: orders, list: [ o_orderkey, o_orderdate ], filter: null, cost: 2000 } + └── Scan { table: nation, list: [ n_nationkey, n_name ], filter: null, cost: 2000 } */ -- tpch-q10: TPC-H Q10 @@ -490,6 +635,7 @@ Projection │ │ │ │ ├── c_phone │ │ │ │ ├── c_acctbal │ │ │ │ └── c_comment + │ │ │ ├── filter: null │ │ │ └── cost: 7000 │ │ └── Projection { exprs: [ o_orderkey, o_custkey ], cost: 7092 } │ │ └── Filter @@ -500,13 +646,15 @@ Projection │ │ └── Scan │ │ ├── table: orders │ │ ├── list: [ o_orderkey, o_custkey, o_orderdate ] + │ │ ├── filter: null │ │ └── cost: 3000 │ └── Projection { exprs: [ l_orderkey, l_extendedprice, l_discount ], cost: 5780 } │ └── Filter { cond: = { lhs: l_returnflag, rhs: 'R' }, cost: 5100 } │ └── Scan │ ├── table: lineitem │ ├── list: [ l_orderkey, l_extendedprice, l_discount, l_returnflag ] + │ ├── filter: null │ └── cost: 4000 - └── Scan { table: nation, list: [ n_nationkey, n_name ], cost: 2000 } + └── Scan { table: nation, list: [ n_nationkey, n_name ], filter: null, cost: 2000 } */ diff --git a/tests/planner_test/tpch.yml b/tests/planner_test/tpch.yml index 064f4099..8a079e7c 100644 --- a/tests/planner_test/tpch.yml +++ b/tests/planner_test/tpch.yml @@ -189,6 +189,44 @@ tasks: - print +- id: tpch-q9 + sql: | + explain select + nation, + o_year, + sum(amount) as sum_profit + from + ( + select + n_name as nation, + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%green%' + ) as profit + group by + nation, + o_year + order by + nation, + o_year desc; + before: ["*prepare"] + tasks: + - print + - id: tpch-q10 sql: | explain select diff --git a/tests/sql/tpch/create.sql b/tests/sql/tpch/create.sql index 749772aa..dcfec1ab 100644 --- a/tests/sql/tpch/create.sql +++ b/tests/sql/tpch/create.sql @@ -1,18 +1,18 @@ CREATE TABLE NATION ( - N_NATIONKEY INT NOT NULL, + N_NATIONKEY INT PRIMARY KEY, N_NAME CHAR(25) NOT NULL, N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR(152) ); CREATE TABLE REGION ( - R_REGIONKEY INT NOT NULL, + R_REGIONKEY INT PRIMARY KEY, R_NAME CHAR(25) NOT NULL, R_COMMENT VARCHAR(152) ); CREATE TABLE PART ( - P_PARTKEY INT NOT NULL, + P_PARTKEY INT PRIMARY KEY, P_NAME VARCHAR(55) NOT NULL, P_MFGR CHAR(25) NOT NULL, P_BRAND CHAR(10) NOT NULL, @@ -24,7 +24,7 @@ CREATE TABLE PART ( ); CREATE TABLE SUPPLIER ( - S_SUPPKEY INT NOT NULL, + S_SUPPKEY INT PRIMARY KEY, S_NAME CHAR(25) NOT NULL, S_ADDRESS VARCHAR(40) NOT NULL, S_NATIONKEY INT NOT NULL, @@ -39,10 +39,11 @@ CREATE TABLE PARTSUPP ( PS_AVAILQTY INT NOT NULL, PS_SUPPLYCOST DECIMAL(15,2) NOT NULL, PS_COMMENT VARCHAR(199) NOT NULL + -- PRIMARY KEY (PS_PARTKEY, PS_SUPPKEY) ); CREATE TABLE CUSTOMER ( - C_CUSTKEY INT NOT NULL, + C_CUSTKEY INT PRIMARY KEY, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INT NOT NULL, @@ -53,7 +54,7 @@ CREATE TABLE CUSTOMER ( ); CREATE TABLE ORDERS ( - O_ORDERKEY INT NOT NULL, + O_ORDERKEY INT PRIMARY KEY, O_CUSTKEY INT NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, @@ -81,4 +82,5 @@ CREATE TABLE LINEITEM ( L_SHIPINSTRUCT CHAR(25) NOT NULL, L_SHIPMODE CHAR(10) NOT NULL, L_COMMENT VARCHAR(44) NOT NULL + -- PRIMARY KEY (L_ORDERKEY, L_LINENUMBER) );