Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer, storage): pushdown range-filter to storage #786

Merged
merged 17 commits into from
Jul 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/04-storage-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Also there are a lot of things going on internally in *secondary*. Upon starting

A table will typically contain multiple RowSets. *Secondary* will create an iterator over multiple RowSets either by using `MergeIterator` (to do a merge sort), or by using `ConcatIterator` (by yielding data one RowSet by RowSet). Under those iterators, there is one of the most fundamental and important iterator, `RowSetIterator`. `RowSetIterator` scans the underlying columns by using `ColumnIterator`, which uses `BlockIterator` internally. It will also take delete vectors into consideration, so as to filter out deleted rows.

`RowSetIterator` also supports filter scan. Users can provide a filter expression to `RowSetIterator`, and the iterator will skip reading blocks to reduce I/O.
`RowSetIterator` also supports range filter scan. Users can provide a range for primary key to `RowSetIterator`, and the iterator will skip reading blocks to reduce I/O.

There is a special column called `RowHandler` in *secondary*. This column is of int64 type, which contains RowSet id on upper 32 bits, and row offset in lower 32 bits. For example, if there is a table, which contains a RowSet with the following data:

Expand Down
3 changes: 2 additions & 1 deletion src/binder_v2/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl Binder {
let scan = if is_internal {
self.egraph.add(Node::Internal([table, cols]))
} else {
self.egraph.add(Node::Scan([table, cols]))
let true_ = self.egraph.add(Node::true_());
self.egraph.add(Node::Scan([table, cols, true_]))
};
self.egraph.add(Node::CopyTo([ext_source, scan]))
} else {
Expand Down
36 changes: 13 additions & 23 deletions src/binder_v2/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ impl Binder {
return Err(BindError::InvalidColumn(name.clone()));
}
}
ordered_pk_ids =
Binder::ordered_pks_from_constraint(&pks_name_from_constraints, columns);
// We have used `pks_name_from_constraints` to get the primary keys' name sorted by
// declaration order in "primary key(c1, c2..)" syntax. Now we transfer the name to id
// to get the sorted ID
ordered_pk_ids = pks_name_from_constraints
.iter()
.map(|name| {
columns
.iter()
.position(|c| c.name.value.eq_ignore_ascii_case(name))
.unwrap() as ColumnId
})
.collect();
}

let mut columns: Vec<ColumnCatalog> = columns
Expand Down Expand Up @@ -138,26 +148,6 @@ impl Binder {
ordered_pks
}

/// We have used `pks_name_from_constraints` to get the primary keys' name sorted by declaration
/// order in "primary key(c1, c2..)" syntax. Now we transfer the name to id to get the sorted
/// ID
fn ordered_pks_from_constraint(pks_name: &[String], columns: &[ColumnDef]) -> Vec<ColumnId> {
let mut ordered_pks = vec![0; pks_name.len()];
let mut pos_in_ordered_pk = HashMap::new(); // used to get pos from column name
pks_name.iter().enumerate().for_each(|(pos, name)| {
pos_in_ordered_pk.insert(name, pos);
});

columns.iter().enumerate().for_each(|(index, colum_desc)| {
let column_name = &colum_desc.name.value;
if pos_in_ordered_pk.contains_key(column_name) {
let id = index as ColumnId;
let pos = *(pos_in_ordered_pk.get(column_name).unwrap());
ordered_pks[pos] = id;
}
});
ordered_pks
}
/// get the primary keys' name sorted by declaration order in "primary key(c1, c2..)" syntax.
fn pks_name_from_constraints(constraints: &[TableConstraint]) -> Vec<String> {
let mut pks_name_from_constraints = vec![];
Expand All @@ -169,7 +159,7 @@ impl Binder {
columns,
..
} if *is_primary => columns.iter().for_each(|ident| {
pks_name_from_constraints.push(ident.value.clone());
pks_name_from_constraints.push(ident.value.to_lowercase());
}),
_ => continue,
}
Expand Down
3 changes: 2 additions & 1 deletion src/binder_v2/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ impl Binder {
return Err(BindError::NotSupportedOnInternalTable);
}
let cols = self.bind_table_name(name, None, true)?;
let scan = self.egraph.add(Node::Scan([table_id, cols]));
let true_ = self.egraph.add(Node::true_());
let scan = self.egraph.add(Node::Scan([table_id, cols, true_]));
let cond = self.bind_where(selection)?;
let filter = self.egraph.add(Node::Filter([cond, scan]));
Ok(self.egraph.add(Node::Delete([table_id, filter])))
Expand Down
19 changes: 11 additions & 8 deletions src/binder_v2/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ impl Binder {
/// ```ignore
/// (join inner true
/// (join inner (= $1.1 $2.1)
/// (scan $1 (list $1.1 $1.2))
/// (scan $2 (list $2.1))
/// (scan $1 (list $1.1 $1.2) null)
/// (scan $2 (list $2.1) null)
/// )
/// (scan $3 (list $3.1 $3.2))
/// (scan $3 (list $3.1 $3.2) null)
/// )
/// ```
fn bind_table_with_joins(&mut self, tables: TableWithJoins) -> Result {
Expand All @@ -55,7 +55,7 @@ impl Binder {
/// Returns a `Scan` plan of table or a plan of subquery.
///
/// # Example
/// - `bind_table_factor(t)` => `(scan $1 (list $1.1 $1.2 $1.3))`
/// - `bind_table_factor(t)` => `(scan $1 (list $1.1 $1.2 $1.3) null)`
/// - `bind_table_factor(select 1)` => `(values (1))`
fn bind_table_factor(&mut self, table: TableFactor) -> Result {
match table {
Expand All @@ -65,7 +65,8 @@ impl Binder {
let id = if is_internal {
self.egraph.add(Node::Internal([table_id, cols]))
} else {
self.egraph.add(Node::Scan([table_id, cols]))
let true_ = self.egraph.add(Node::null());
self.egraph.add(Node::Scan([table_id, cols, true_]))
};
Ok(id)
}
Expand Down Expand Up @@ -244,6 +245,7 @@ mod tests {
use super::*;
use crate::catalog::{ColumnCatalog, RootCatalog};
use crate::parser::parse;
use crate::planner::Optimizer;

#[test]
fn bind_test_subquery() {
Expand All @@ -256,11 +258,12 @@ mod tests {

let stmts = parse("select x.b from (select a as b from t) as x").unwrap();
let mut binder = Binder::new(catalog.clone());
let optimizer = Optimizer::new(catalog.clone());
for stmt in stmts {
let result = binder.bind(stmt);
println!("{}", result.as_ref().unwrap().pretty(10));
let plan = binder.bind(stmt).unwrap();
println!("{}", plan.pretty(10));

let optimized = crate::planner::optimize(&result.unwrap());
let optimized = optimizer.optimize(&plan);

let mut egraph = egg::EGraph::new(TypeSchemaAnalysis {
catalog: catalog.clone(),
Expand Down
10 changes: 7 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ impl Database {
for stmt in stmts {
let mut binder = crate::binder_v2::Binder::new(self.catalog.clone());
let bound = binder.bind(stmt)?;
let optimized = crate::planner::optimize(&bound);
let mut optimizer = crate::planner::Optimizer::new(self.catalog.clone());
if !self.storage.support_range_filter_scan() {
optimizer.disable_rules("filter-scan");
}
let optimized = optimizer.optimize(&bound);
let executor = match self.storage.clone() {
StorageImpl::InMemoryStorage(s) => {
crate::executor_v2::build(self.catalog.clone(), s, &optimized)
Expand All @@ -237,7 +241,7 @@ impl Database {
let mut binder = Binder::new(self.catalog.clone());
let logical_planner = LogicalPlaner::default();
let mut optimizer = Optimizer {
enable_filter_scan: self.storage.enable_filter_scan(),
enable_filter_scan: self.storage.support_range_filter_scan(),
};
// TODO: parallelize
let mut outputs: Vec<Chunk> = vec![];
Expand Down Expand Up @@ -276,7 +280,7 @@ impl Database {
let mut binder = Binder::new(self.catalog.clone());
let logical_planner = LogicalPlaner::default();
let mut optimizer = Optimizer {
enable_filter_scan: self.storage.enable_filter_scan(),
enable_filter_scan: self.storage.support_range_filter_scan(),
};
let mut plans = vec![];
for stmt in stmts {
Expand Down
4 changes: 2 additions & 2 deletions src/executor_v2/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pretty_xmlish::PrettyConfig;

use super::*;
use crate::array::{ArrayImpl, Utf8Array};
use crate::planner::{costs, Explain};
use crate::planner::{Explain, Optimizer};

/// The executor of `explain` statement.
pub struct ExplainExecutor {
Expand All @@ -15,7 +15,7 @@ pub struct ExplainExecutor {

impl ExplainExecutor {
pub fn execute(self) -> BoxedExecutor {
let costs = costs(&self.plan);
let costs = Optimizer::new(self.catalog.clone()).costs(&self.plan);
let explain_obj = Explain::of(&self.plan)
.with_costs(&costs)
.with_catalog(&self.catalog);
Expand Down
12 changes: 10 additions & 2 deletions src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use self::top_n::TopNExecutor;
use self::values::*;
use crate::array::DataChunk;
use crate::catalog::RootCatalogRef;
use crate::planner::{Expr, RecExpr, TypeSchemaAnalysis};
use crate::planner::{Expr, ExprAnalysis, RecExpr, TypeSchemaAnalysis};
use crate::storage::{Storage, TracedStorageError};
use crate::types::{ColumnIndex, ConvertError, DataType};

Expand Down Expand Up @@ -205,11 +205,19 @@ impl<S: Storage> Builder<S> {
fn build_id(&self, id: Id) -> BoxedExecutor {
use Expr::*;
let stream = match self.node(id).clone() {
Scan([table, list]) => TableScanExecutor {
Scan([table, list, filter]) => TableScanExecutor {
table_id: self.node(table).as_table(),
columns: (self.node(list).as_list().iter())
.map(|id| self.node(*id).as_column())
.collect(),
filter: {
// analyze range for the filter
let mut egraph = egg::EGraph::new(ExprAnalysis {
catalog: self.catalog.clone(),
});
let root = egraph.add_expr(&self.recexpr(filter));
egraph[root].data.range.clone().map(|(_, r)| r)
},
storage: self.storage.clone(),
}
.execute(),
Expand Down
11 changes: 5 additions & 6 deletions src/executor_v2/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use std::sync::Arc;
use super::*;
use crate::array::DataChunk;
use crate::catalog::{ColumnRefId, TableRefId};
use crate::storage::{Storage, StorageColumnRef, Table, Transaction, TxnIterator};
use crate::storage::{
KeyRange, ScanOptions, Storage, StorageColumnRef, Table, Transaction, TxnIterator,
};

/// The executor of table scan operation.
pub struct TableScanExecutor<S: Storage> {
pub table_id: TableRefId,
pub columns: Vec<ColumnRefId>,
pub filter: Option<KeyRange>,
pub storage: Arc<S>,
}

Expand All @@ -37,12 +40,8 @@ impl<S: Storage> TableScanExecutor<S> {

let mut it = txn
.scan(
&[],
&[],
&col_idx,
false, // TODO: is_sorted
false,
None, // TODO: support filter scan
ScanOptions::default().with_filter_opt(self.filter),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#![feature(iterator_try_collect)]
#![feature(async_fn_in_trait)]
#![feature(return_position_impl_trait_in_trait)]
#![feature(let_chains)]
#![allow(incomplete_features)]

/// Top-level structure of the database.
Expand Down
11 changes: 10 additions & 1 deletion src/planner/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,17 @@ impl<'a> Explain<'a> {
vec![self.expr(a).pretty()],
),

Scan([table, list]) | Internal([table, list]) => Pretty::childless_record(
Scan([table, list, filter]) => Pretty::childless_record(
"Scan",
vec![
("table", self.expr(table).pretty()),
("list", self.expr(list).pretty()),
("filter", self.expr(filter).pretty()),
]
.with_cost(cost),
),
Internal([table, list]) => Pretty::childless_record(
"Internal",
vec![
("table", self.expr(table).pretty()),
("list", self.expr(list).pretty()),
Expand Down
Loading
Loading