From 3a88fbc754a216cf534e3fae98ca106c4c564999 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Mon, 8 Nov 2021 14:15:49 +0800 Subject: [PATCH 1/2] feat(storage): add is_sorted support for in-memory engine Signed-off-by: Alex Chi --- src/binder/expression/column_ref.rs | 13 ++++- src/catalog/column.rs | 14 +++++ src/executor/aggregation.rs | 3 ++ src/executor/seq_scan.rs | 4 +- src/logical_planner/delete.rs | 2 +- src/logical_planner/select.rs | 21 ++++++-- src/logical_planner/seq_scan.rs | 1 + src/physical_planner/seq_scan.rs | 7 ++- src/storage/memory/table.rs | 6 +++ src/storage/memory/transaction.rs | 60 ++++++++++++++++++++-- src/storage/secondary/rowset/mem_rowset.rs | 3 +- src/storage/secondary/rowset/mod.rs | 15 ------ 12 files changed, 119 insertions(+), 30 deletions(-) diff --git a/src/binder/expression/column_ref.rs b/src/binder/expression/column_ref.rs index 2cd2898c..910cfc62 100644 --- a/src/binder/expression/column_ref.rs +++ b/src/binder/expression/column_ref.rs @@ -6,6 +6,7 @@ pub struct BoundColumnRef { pub table_name: String, pub column_ref_id: ColumnRefId, pub column_index: ColumnId, + pub is_primary_key: bool, } impl Binder { @@ -27,6 +28,7 @@ impl Binder { table_name: table.name().clone(), column_ref_id, column_index: u32::MAX, + is_primary_key: col.is_primary(), }), return_type: Some(col.datatype().clone()), }; @@ -66,6 +68,7 @@ impl Binder { table_name: name.clone(), column_ref_id, column_index: u32::MAX, + is_primary_key: col.is_primary(), }), return_type: Some(col.datatype()), }) @@ -78,10 +81,15 @@ impl Binder { return Err(BindError::AmbiguousColumn); } let column_ref_id = ColumnRefId::from_table(*ref_id, col.id()); - info = Some((table.name().clone(), column_ref_id, col.datatype())); + info = Some(( + table.name().clone(), + column_ref_id, + col.datatype(), + col.is_primary(), + )); } } - let (table_name, column_ref_id, data_type) = + let (table_name, column_ref_id, data_type, is_primary_key) = info.ok_or_else(|| BindError::InvalidColumn(column_name.clone()))?; Self::record_regular_table_column( &mut self.context.column_names, @@ -96,6 +104,7 @@ impl Binder { table_name: table_name.clone(), column_ref_id, column_index: u32::MAX, + is_primary_key, }), return_type: Some(data_type), }) diff --git a/src/catalog/column.rs b/src/catalog/column.rs index 4fa4e1f7..cdfe9e8f 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -89,6 +89,20 @@ impl ColumnCatalog { } } +/// Find the id of the sort key among column catalogs +pub fn find_sort_key_id(column_infos: &[ColumnCatalog]) -> Option { + let mut key = None; + for (id, column_info) in column_infos.iter().enumerate() { + if column_info.is_primary() { + if key.is_some() { + panic!("only one primary key is supported"); + } + key = Some(id); + } + } + key +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/executor/aggregation.rs b/src/executor/aggregation.rs index d408fb42..80636608 100644 --- a/src/executor/aggregation.rs +++ b/src/executor/aggregation.rs @@ -168,12 +168,14 @@ mod tests { }, column_ids: vec![0, 1], with_row_handler: false, + is_sorted: false, }; let column0 = BoundExpr { kind: BoundExprKind::ColumnRef(BoundColumnRef { table_name: "t".into(), column_ref_id: ColumnRefId::new(0, 0, 0, 0), column_index: 0, + is_primary_key: false, }), return_type: Some(DataType::new(DataTypeKind::Int, false)), }; @@ -182,6 +184,7 @@ mod tests { table_name: "t".into(), column_ref_id: ColumnRefId::new(0, 0, 0, 1), column_index: 1, + is_primary_key: false, }), return_type: Some(DataType::new(DataTypeKind::Int, false)), }; diff --git a/src/executor/seq_scan.rs b/src/executor/seq_scan.rs index 40e2137a..10449bf6 100644 --- a/src/executor/seq_scan.rs +++ b/src/executor/seq_scan.rs @@ -38,7 +38,9 @@ impl SeqScanExecutor { } let txn = table.read().await?; - let mut it = txn.scan(None, None, &col_idx, false, false).await?; + let mut it = txn + .scan(None, None, &col_idx, self.plan.is_sorted, false) + .await?; // Notice: The column ids may not be ordered. while let Some(chunk) = it.next_batch(None).await? { diff --git a/src/logical_planner/delete.rs b/src/logical_planner/delete.rs index bdd75628..2ab59004 100644 --- a/src/logical_planner/delete.rs +++ b/src/logical_planner/delete.rs @@ -13,7 +13,7 @@ impl LogicalPlaner { pub fn plan_delete(&self, stmt: BoundDelete) -> Result { if let BoundTableRef::BaseTableRef { ref ref_id, .. } = stmt.from_table { if let Some(expr) = stmt.where_clause { - let child = Box::new(self.plan_table_ref(&stmt.from_table, true)?); + let child = Box::new(self.plan_table_ref(&stmt.from_table, true, false)?); Ok(LogicalPlan::Delete(LogicalDelete { table_ref_id: *ref_id, filter: LogicalFilter { expr, child }, diff --git a/src/logical_planner/select.rs b/src/logical_planner/select.rs index bb42bae2..1cb73a6c 100644 --- a/src/logical_planner/select.rs +++ b/src/logical_planner/select.rs @@ -12,8 +12,18 @@ use crate::binder::{BoundExprKind, BoundSelect, BoundTableRef}; impl LogicalPlaner { pub fn plan_select(&self, stmt: Box) -> Result { let mut plan = LogicalPlan::Dummy; + let mut is_sorted = false; + if let Some(table_ref) = stmt.from_table.get(0) { - plan = self.plan_table_ref(table_ref, false)?; + // use `sorted` mode from the storage engine if the order by column is the primary key + if stmt.orderby.len() == 1 && !stmt.orderby[0].descending { + if let BoundExprKind::ColumnRef(col_ref) = &stmt.orderby[0].expr.kind { + if col_ref.is_primary_key { + is_sorted = true; + } + } + } + plan = self.plan_table_ref(table_ref, false, is_sorted)?; } if let Some(expr) = stmt.where_clause { @@ -32,7 +42,7 @@ impl LogicalPlaner { child: Box::new(plan), }); } - if !stmt.orderby.is_empty() { + if !stmt.orderby.is_empty() && !is_sorted { plan = LogicalPlan::Order(LogicalOrder { comparators: stmt.orderby, child: Box::new(plan), @@ -69,6 +79,7 @@ impl LogicalPlaner { &self, table_ref: &BoundTableRef, with_row_handler: bool, + is_sorted: bool, ) -> Result { match table_ref { BoundTableRef::BaseTableRef { @@ -79,15 +90,17 @@ impl LogicalPlaner { table_ref_id: *ref_id, column_ids: column_ids.to_vec(), with_row_handler, + is_sorted, })), BoundTableRef::JoinTableRef { relation, join_tables, } => { - let relation_plan = self.plan_table_ref(relation, with_row_handler)?; + let relation_plan = self.plan_table_ref(relation, with_row_handler, is_sorted)?; let mut join_table_plans = vec![]; for table in join_tables.iter() { - let table_plan = self.plan_table_ref(&table.table_ref, with_row_handler)?; + let table_plan = + self.plan_table_ref(&table.table_ref, with_row_handler, is_sorted)?; join_table_plans.push(LogicalJoinTable { table_plan: Box::new(table_plan), join_op: table.join_op.clone(), diff --git a/src/logical_planner/seq_scan.rs b/src/logical_planner/seq_scan.rs index 44d1b2f2..2c16b243 100644 --- a/src/logical_planner/seq_scan.rs +++ b/src/logical_planner/seq_scan.rs @@ -7,4 +7,5 @@ pub struct LogicalSeqScan { pub table_ref_id: TableRefId, pub column_ids: Vec, pub with_row_handler: bool, + pub is_sorted: bool, } diff --git a/src/physical_planner/seq_scan.rs b/src/physical_planner/seq_scan.rs index 61be2390..a9756581 100644 --- a/src/physical_planner/seq_scan.rs +++ b/src/physical_planner/seq_scan.rs @@ -11,6 +11,7 @@ pub struct PhysicalSeqScan { pub table_ref_id: TableRefId, pub column_ids: Vec, pub with_row_handler: bool, + pub is_sorted: bool, } impl PhysicalPlaner { @@ -19,6 +20,7 @@ impl PhysicalPlaner { table_ref_id: plan.table_ref_id, column_ids: plan.column_ids, with_row_handler: plan.with_row_handler, + is_sorted: plan.is_sorted, })) } } @@ -27,10 +29,11 @@ impl PlanExplainable for PhysicalSeqScan { fn explain_inner(&self, _level: usize, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!( f, - "SeqScan: table #{}, columns [{}], with_row_handler: {}", + "SeqScan: table #{}, columns [{}], with_row_handler: {}, is_sorted: {}", self.table_ref_id.table_id, self.column_ids.iter().map(ToString::to_string).join(", "), - self.with_row_handler + self.with_row_handler, + self.is_sorted ) } } diff --git a/src/storage/memory/table.rs b/src/storage/memory/table.rs index c21e8b93..1a0d981c 100644 --- a/src/storage/memory/table.rs +++ b/src/storage/memory/table.rs @@ -20,6 +20,7 @@ pub(super) struct InMemoryTableInner { chunks: Vec, deleted_rows: HashSet, columns: HashMap, + column_infos: Arc<[ColumnCatalog]>, } pub(super) type InMemoryTableInnerRef = Arc>; @@ -33,6 +34,7 @@ impl InMemoryTableInner { .map(|col| (col.id(), col.desc().clone())) .collect(), deleted_rows: HashSet::new(), + column_infos: columns.into(), } } @@ -66,6 +68,10 @@ impl InMemoryTableInner { }) .try_collect() } + + pub fn get_column_infos(&self) -> Arc<[ColumnCatalog]> { + self.column_infos.clone() + } } impl InMemoryTable { diff --git a/src/storage/memory/transaction.rs b/src/storage/memory/transaction.rs index e99b75d4..b18d052d 100644 --- a/src/storage/memory/transaction.rs +++ b/src/storage/memory/transaction.rs @@ -3,9 +3,13 @@ use std::sync::Arc; use super::table::InMemoryTableInnerRef; use super::{InMemoryRowHandler, InMemoryTable, InMemoryTxnIterator}; -use crate::array::{DataChunk, DataChunkRef}; +use crate::array::{ + ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk, DataChunkRef, +}; +use crate::catalog::{find_sort_key_id, ColumnCatalog}; use crate::storage::{StorageColumnRef, StorageResult, Transaction}; use async_trait::async_trait; +use itertools::Itertools; /// A transaction running on `InMemoryStorage`. pub struct InMemoryTransaction { @@ -29,6 +33,9 @@ pub struct InMemoryTransaction { /// Snapshot of all deleted rows deleted_rows: Arc>, + + /// All information about columns + column_infos: Arc<[ColumnCatalog]>, } impl InMemoryTransaction { @@ -41,10 +48,52 @@ impl InMemoryTransaction { table: table.inner.clone(), snapshot: Arc::new(inner.get_all_chunks()), deleted_rows: Arc::new(inner.get_all_deleted_rows()), + column_infos: inner.get_column_infos(), }) } } +fn sort_datachunk_by_pk( + chunks: &Arc>>, + column_infos: &[ColumnCatalog], +) -> Arc>> { + if let Some(sort_key_id) = find_sort_key_id(column_infos) { + println!("using {}", sort_key_id); + if chunks.is_empty() { + return chunks.clone(); + } + let mut builders = chunks[0] + .arrays() + .iter() + .map(ArrayBuilderImpl::from_type_of_array) + .collect_vec(); + + for chunk in &**chunks { + for (array, builder) in chunk.arrays().iter().zip(builders.iter_mut()) { + builder.append(array); + } + } + + let arrays = builders + .into_iter() + .map(|builder| builder.finish()) + .collect_vec(); + let sorted_index = arrays[sort_key_id].get_sorted_indices(); + + let chunk = arrays + .into_iter() + .map(|array| { + let mut builder = ArrayBuilderImpl::from_type_of_array(&array); + builder.pick_from(&array, &sorted_index); + builder.finish() + }) + .collect::(); + Arc::new(vec![Arc::new(chunk)]) + } else { + chunks.clone() + } +} + #[async_trait] impl Transaction for InMemoryTransaction { type TxnIteratorType = InMemoryTxnIterator; @@ -68,10 +117,15 @@ impl Transaction for InMemoryTransaction { "sort_key is not supported in InMemoryEngine for now" ); assert!(!reversed, "reverse iterator is not supported for now"); - assert!(!is_sorted, "sorted iterator is not supported for now"); + + let snapshot = if is_sorted { + sort_datachunk_by_pk(&self.snapshot, &self.column_infos) + } else { + self.snapshot.clone() + }; Ok(InMemoryTxnIterator::new( - self.snapshot.clone(), + snapshot, self.deleted_rows.clone(), col_idx, )) diff --git a/src/storage/secondary/rowset/mem_rowset.rs b/src/storage/secondary/rowset/mem_rowset.rs index 43a5315f..7b6cfb3f 100644 --- a/src/storage/secondary/rowset/mem_rowset.rs +++ b/src/storage/secondary/rowset/mem_rowset.rs @@ -2,11 +2,10 @@ use std::path::Path; use std::sync::Arc; use crate::array::{ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk}; -use crate::catalog::ColumnCatalog; +use crate::catalog::{find_sort_key_id, ColumnCatalog}; use crate::storage::StorageResult; use itertools::Itertools; -use super::find_sort_key_id; use super::rowset_builder::RowsetBuilder; use crate::storage::secondary::ColumnBuilderOptions; diff --git a/src/storage/secondary/rowset/mod.rs b/src/storage/secondary/rowset/mod.rs index ec9a1183..a6f46363 100644 --- a/src/storage/secondary/rowset/mod.rs +++ b/src/storage/secondary/rowset/mod.rs @@ -39,18 +39,3 @@ mod disk_rowset; pub use disk_rowset::*; mod rowset_iterator; pub use rowset_iterator::*; - -use crate::catalog::ColumnCatalog; - -pub fn find_sort_key_id(column_infos: &[ColumnCatalog]) -> Option { - let mut key = None; - for (id, column_info) in column_infos.iter().enumerate() { - if column_info.is_primary() { - if key.is_some() { - panic!("only one primary key is supported"); - } - key = Some(id); - } - } - key -} From dc27c336c4de45a22812f7ee66fbc40c4fbbf242 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Mon, 8 Nov 2021 14:22:13 +0800 Subject: [PATCH 2/2] add comments Signed-off-by: Alex Chi --- src/storage/memory/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/memory/transaction.rs b/src/storage/memory/transaction.rs index b18d052d..fb005003 100644 --- a/src/storage/memory/transaction.rs +++ b/src/storage/memory/transaction.rs @@ -53,12 +53,12 @@ impl InMemoryTransaction { } } +/// If primary key is found in [`ColumnCatalog`], sort all in-memory data using that key. fn sort_datachunk_by_pk( chunks: &Arc>>, column_infos: &[ColumnCatalog], ) -> Arc>> { if let Some(sort_key_id) = find_sort_key_id(column_infos) { - println!("using {}", sort_key_id); if chunks.is_empty() { return chunks.clone(); }