Skip to content

Commit

Permalink
feat(optimizer, storage): pushdown range-filter to storage (#786)
Browse files Browse the repository at this point in the history
* extract `ScanOptions`

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* storage: remove general filter and only keep range filter

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* planner_v2: add filter to scan node

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* only keep single key in KeyRange

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* add range analysis and filter scan rule

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* fix primary key constraint

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* make range-filter scan work

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* fix clippy

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* fix panic on range-filter scan

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* fix column prune rule for range-filter

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* disable range filter scan for memory storage

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* fix sqllogictest

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* update docs

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* fix format

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* change default range filter to null

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* update planner test

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* add planner test for tpch q9

Signed-off-by: Runji Wang <wangrunji0408@163.com>

---------

Signed-off-by: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
wangrunji0408 committed Jul 15, 2023
1 parent 604b4a1 commit a0882cd
Show file tree
Hide file tree
Showing 37 changed files with 766 additions and 637 deletions.
2 changes: 1 addition & 1 deletion docs/04-storage-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Also there are a lot of things going on internally in *secondary*. Upon starting

A table will typically contain multiple RowSets. *Secondary* will create an iterator over multiple RowSets either by using `MergeIterator` (to do a merge sort), or by using `ConcatIterator` (by yielding data one RowSet by RowSet). Under those iterators, there is one of the most fundamental and important iterator, `RowSetIterator`. `RowSetIterator` scans the underlying columns by using `ColumnIterator`, which uses `BlockIterator` internally. It will also take delete vectors into consideration, so as to filter out deleted rows.

`RowSetIterator` also supports filter scan. Users can provide a filter expression to `RowSetIterator`, and the iterator will skip reading blocks to reduce I/O.
`RowSetIterator` also supports range filter scan. Users can provide a range for primary key to `RowSetIterator`, and the iterator will skip reading blocks to reduce I/O.

There is a special column called `RowHandler` in *secondary*. This column is of int64 type, which contains RowSet id on upper 32 bits, and row offset in lower 32 bits. For example, if there is a table, which contains a RowSet with the following data:

Expand Down
3 changes: 2 additions & 1 deletion src/binder_v2/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl Binder {
let scan = if is_internal {
self.egraph.add(Node::Internal([table, cols]))
} else {
self.egraph.add(Node::Scan([table, cols]))
let true_ = self.egraph.add(Node::true_());
self.egraph.add(Node::Scan([table, cols, true_]))
};
self.egraph.add(Node::CopyTo([ext_source, scan]))
} else {
Expand Down
36 changes: 13 additions & 23 deletions src/binder_v2/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ impl Binder {
return Err(BindError::InvalidColumn(name.clone()));
}
}
ordered_pk_ids =
Binder::ordered_pks_from_constraint(&pks_name_from_constraints, columns);
// We have used `pks_name_from_constraints` to get the primary keys' name sorted by
// declaration order in "primary key(c1, c2..)" syntax. Now we transfer the name to id
// to get the sorted ID
ordered_pk_ids = pks_name_from_constraints
.iter()
.map(|name| {
columns
.iter()
.position(|c| c.name.value.eq_ignore_ascii_case(name))
.unwrap() as ColumnId
})
.collect();
}

let mut columns: Vec<ColumnCatalog> = columns
Expand Down Expand Up @@ -138,26 +148,6 @@ impl Binder {
ordered_pks
}

/// We have used `pks_name_from_constraints` to get the primary keys' name sorted by declaration
/// order in "primary key(c1, c2..)" syntax. Now we transfer the name to id to get the sorted
/// ID
fn ordered_pks_from_constraint(pks_name: &[String], columns: &[ColumnDef]) -> Vec<ColumnId> {
let mut ordered_pks = vec![0; pks_name.len()];
let mut pos_in_ordered_pk = HashMap::new(); // used to get pos from column name
pks_name.iter().enumerate().for_each(|(pos, name)| {
pos_in_ordered_pk.insert(name, pos);
});

columns.iter().enumerate().for_each(|(index, colum_desc)| {
let column_name = &colum_desc.name.value;
if pos_in_ordered_pk.contains_key(column_name) {
let id = index as ColumnId;
let pos = *(pos_in_ordered_pk.get(column_name).unwrap());
ordered_pks[pos] = id;
}
});
ordered_pks
}
/// get the primary keys' name sorted by declaration order in "primary key(c1, c2..)" syntax.
fn pks_name_from_constraints(constraints: &[TableConstraint]) -> Vec<String> {
let mut pks_name_from_constraints = vec![];
Expand All @@ -169,7 +159,7 @@ impl Binder {
columns,
..
} if *is_primary => columns.iter().for_each(|ident| {
pks_name_from_constraints.push(ident.value.clone());
pks_name_from_constraints.push(ident.value.to_lowercase());
}),
_ => continue,
}
Expand Down
3 changes: 2 additions & 1 deletion src/binder_v2/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ impl Binder {
return Err(BindError::NotSupportedOnInternalTable);
}
let cols = self.bind_table_name(name, None, true)?;
let scan = self.egraph.add(Node::Scan([table_id, cols]));
let true_ = self.egraph.add(Node::true_());
let scan = self.egraph.add(Node::Scan([table_id, cols, true_]));
let cond = self.bind_where(selection)?;
let filter = self.egraph.add(Node::Filter([cond, scan]));
Ok(self.egraph.add(Node::Delete([table_id, filter])))
Expand Down
19 changes: 11 additions & 8 deletions src/binder_v2/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ impl Binder {
/// ```ignore
/// (join inner true
/// (join inner (= $1.1 $2.1)
/// (scan $1 (list $1.1 $1.2))
/// (scan $2 (list $2.1))
/// (scan $1 (list $1.1 $1.2) null)
/// (scan $2 (list $2.1) null)
/// )
/// (scan $3 (list $3.1 $3.2))
/// (scan $3 (list $3.1 $3.2) null)
/// )
/// ```
fn bind_table_with_joins(&mut self, tables: TableWithJoins) -> Result {
Expand All @@ -55,7 +55,7 @@ impl Binder {
/// Returns a `Scan` plan of table or a plan of subquery.
///
/// # Example
/// - `bind_table_factor(t)` => `(scan $1 (list $1.1 $1.2 $1.3))`
/// - `bind_table_factor(t)` => `(scan $1 (list $1.1 $1.2 $1.3) null)`
/// - `bind_table_factor(select 1)` => `(values (1))`
fn bind_table_factor(&mut self, table: TableFactor) -> Result {
match table {
Expand All @@ -65,7 +65,8 @@ impl Binder {
let id = if is_internal {
self.egraph.add(Node::Internal([table_id, cols]))
} else {
self.egraph.add(Node::Scan([table_id, cols]))
let true_ = self.egraph.add(Node::null());
self.egraph.add(Node::Scan([table_id, cols, true_]))
};
Ok(id)
}
Expand Down Expand Up @@ -244,6 +245,7 @@ mod tests {
use super::*;
use crate::catalog::{ColumnCatalog, RootCatalog};
use crate::parser::parse;
use crate::planner::Optimizer;

#[test]
fn bind_test_subquery() {
Expand All @@ -256,11 +258,12 @@ mod tests {

let stmts = parse("select x.b from (select a as b from t) as x").unwrap();
let mut binder = Binder::new(catalog.clone());
let optimizer = Optimizer::new(catalog.clone());
for stmt in stmts {
let result = binder.bind(stmt);
println!("{}", result.as_ref().unwrap().pretty(10));
let plan = binder.bind(stmt).unwrap();
println!("{}", plan.pretty(10));

let optimized = crate::planner::optimize(&result.unwrap());
let optimized = optimizer.optimize(&plan);

let mut egraph = egg::EGraph::new(TypeSchemaAnalysis {
catalog: catalog.clone(),
Expand Down
10 changes: 7 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ impl Database {
for stmt in stmts {
let mut binder = crate::binder_v2::Binder::new(self.catalog.clone());
let bound = binder.bind(stmt)?;
let optimized = crate::planner::optimize(&bound);
let mut optimizer = crate::planner::Optimizer::new(self.catalog.clone());
if !self.storage.support_range_filter_scan() {
optimizer.disable_rules("filter-scan");
}
let optimized = optimizer.optimize(&bound);
let executor = match self.storage.clone() {
StorageImpl::InMemoryStorage(s) => {
crate::executor_v2::build(self.catalog.clone(), s, &optimized)
Expand All @@ -237,7 +241,7 @@ impl Database {
let mut binder = Binder::new(self.catalog.clone());
let logical_planner = LogicalPlaner::default();
let mut optimizer = Optimizer {
enable_filter_scan: self.storage.enable_filter_scan(),
enable_filter_scan: self.storage.support_range_filter_scan(),
};
// TODO: parallelize
let mut outputs: Vec<Chunk> = vec![];
Expand Down Expand Up @@ -276,7 +280,7 @@ impl Database {
let mut binder = Binder::new(self.catalog.clone());
let logical_planner = LogicalPlaner::default();
let mut optimizer = Optimizer {
enable_filter_scan: self.storage.enable_filter_scan(),
enable_filter_scan: self.storage.support_range_filter_scan(),
};
let mut plans = vec![];
for stmt in stmts {
Expand Down
4 changes: 2 additions & 2 deletions src/executor_v2/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pretty_xmlish::PrettyConfig;

use super::*;
use crate::array::{ArrayImpl, Utf8Array};
use crate::planner::{costs, Explain};
use crate::planner::{Explain, Optimizer};

/// The executor of `explain` statement.
pub struct ExplainExecutor {
Expand All @@ -15,7 +15,7 @@ pub struct ExplainExecutor {

impl ExplainExecutor {
pub fn execute(self) -> BoxedExecutor {
let costs = costs(&self.plan);
let costs = Optimizer::new(self.catalog.clone()).costs(&self.plan);
let explain_obj = Explain::of(&self.plan)
.with_costs(&costs)
.with_catalog(&self.catalog);
Expand Down
12 changes: 10 additions & 2 deletions src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use self::values::*;
use self::window::*;
use crate::array::DataChunk;
use crate::catalog::RootCatalogRef;
use crate::planner::{Expr, RecExpr, TypeSchemaAnalysis};
use crate::planner::{Expr, ExprAnalysis, RecExpr, TypeSchemaAnalysis};
use crate::storage::{Storage, TracedStorageError};
use crate::types::{ColumnIndex, ConvertError, DataType};

Expand Down Expand Up @@ -207,11 +207,19 @@ impl<S: Storage> Builder<S> {
fn build_id(&self, id: Id) -> BoxedExecutor {
use Expr::*;
let stream = match self.node(id).clone() {
Scan([table, list]) => TableScanExecutor {
Scan([table, list, filter]) => TableScanExecutor {
table_id: self.node(table).as_table(),
columns: (self.node(list).as_list().iter())
.map(|id| self.node(*id).as_column())
.collect(),
filter: {
// analyze range for the filter
let mut egraph = egg::EGraph::new(ExprAnalysis {
catalog: self.catalog.clone(),
});
let root = egraph.add_expr(&self.recexpr(filter));
egraph[root].data.range.clone().map(|(_, r)| r)
},
storage: self.storage.clone(),
}
.execute(),
Expand Down
11 changes: 5 additions & 6 deletions src/executor_v2/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use std::sync::Arc;
use super::*;
use crate::array::DataChunk;
use crate::catalog::{ColumnRefId, TableRefId};
use crate::storage::{Storage, StorageColumnRef, Table, Transaction, TxnIterator};
use crate::storage::{
KeyRange, ScanOptions, Storage, StorageColumnRef, Table, Transaction, TxnIterator,
};

/// The executor of table scan operation.
pub struct TableScanExecutor<S: Storage> {
pub table_id: TableRefId,
pub columns: Vec<ColumnRefId>,
pub filter: Option<KeyRange>,
pub storage: Arc<S>,
}

Expand All @@ -37,12 +40,8 @@ impl<S: Storage> TableScanExecutor<S> {

let mut it = txn
.scan(
&[],
&[],
&col_idx,
false, // TODO: is_sorted
false,
None, // TODO: support filter scan
ScanOptions::default().with_filter_opt(self.filter),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#![feature(iterator_try_collect)]
#![feature(async_fn_in_trait)]
#![feature(return_position_impl_trait_in_trait)]
#![feature(let_chains)]
#![allow(incomplete_features)]

/// Top-level structure of the database.
Expand Down
11 changes: 10 additions & 1 deletion src/planner/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,17 @@ impl<'a> Explain<'a> {
vec![self.expr(a).pretty()],
),

Scan([table, list]) | Internal([table, list]) => Pretty::childless_record(
Scan([table, list, filter]) => Pretty::childless_record(
"Scan",
vec![
("table", self.expr(table).pretty()),
("list", self.expr(list).pretty()),
("filter", self.expr(filter).pretty()),
]
.with_cost(cost),
),
Internal([table, list]) => Pretty::childless_record(
"Internal",
vec![
("table", self.expr(table).pretty()),
("list", self.expr(list).pretty()),
Expand Down

0 comments on commit a0882cd

Please sign in to comment.