Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add is_sorted support for in-memory engine #121

Merged
merged 3 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/binder/expression/column_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct BoundColumnRef {
pub table_name: String,
pub column_ref_id: ColumnRefId,
pub column_index: ColumnId,
pub is_primary_key: bool,
}

impl Binder {
Expand All @@ -27,6 +28,7 @@ impl Binder {
table_name: table.name().clone(),
column_ref_id,
column_index: u32::MAX,
is_primary_key: col.is_primary(),
}),
return_type: Some(col.datatype().clone()),
};
Expand Down Expand Up @@ -66,6 +68,7 @@ impl Binder {
table_name: name.clone(),
column_ref_id,
column_index: u32::MAX,
is_primary_key: col.is_primary(),
}),
return_type: Some(col.datatype()),
})
Expand All @@ -78,10 +81,15 @@ impl Binder {
return Err(BindError::AmbiguousColumn);
}
let column_ref_id = ColumnRefId::from_table(*ref_id, col.id());
info = Some((table.name().clone(), column_ref_id, col.datatype()));
info = Some((
table.name().clone(),
column_ref_id,
col.datatype(),
col.is_primary(),
));
}
}
let (table_name, column_ref_id, data_type) =
let (table_name, column_ref_id, data_type, is_primary_key) =
info.ok_or_else(|| BindError::InvalidColumn(column_name.clone()))?;
Self::record_regular_table_column(
&mut self.context.column_names,
Expand All @@ -96,6 +104,7 @@ impl Binder {
table_name: table_name.clone(),
column_ref_id,
column_index: u32::MAX,
is_primary_key,
}),
return_type: Some(data_type),
})
Expand Down
14 changes: 14 additions & 0 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ impl ColumnCatalog {
}
}

/// Find the id of the sort key among column catalogs
pub fn find_sort_key_id(column_infos: &[ColumnCatalog]) -> Option<usize> {
let mut key = None;
for (id, column_info) in column_infos.iter().enumerate() {
if column_info.is_primary() {
if key.is_some() {
panic!("only one primary key is supported");
}
key = Some(id);
}
}
key
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 3 additions & 1 deletion src/executor/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ impl<S: Storage> SeqScanExecutor<S> {
}

let txn = table.read().await?;
let mut it = txn.scan(None, None, &col_idx, false, false).await?;
let mut it = txn
.scan(None, None, &col_idx, self.plan.is_sorted, false)
.await?;

// Notice: The column ids may not be ordered.
while let Some(chunk) = it.next_batch(None).await? {
Expand Down
2 changes: 1 addition & 1 deletion src/logical_planner/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl LogicalPlaner {
pub fn plan_delete(&self, stmt: BoundDelete) -> Result<LogicalPlan, LogicalPlanError> {
if let BoundTableRef::BaseTableRef { ref ref_id, .. } = stmt.from_table {
if let Some(expr) = stmt.where_clause {
let child = Box::new(self.plan_table_ref(&stmt.from_table, true)?);
let child = Box::new(self.plan_table_ref(&stmt.from_table, true, false)?);
Ok(LogicalPlan::Delete(LogicalDelete {
table_ref_id: *ref_id,
filter: LogicalFilter { expr, child },
Expand Down
21 changes: 17 additions & 4 deletions src/logical_planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,18 @@ use crate::binder::{BoundExprKind, BoundSelect, BoundTableRef};
impl LogicalPlaner {
pub fn plan_select(&self, stmt: Box<BoundSelect>) -> Result<LogicalPlan, LogicalPlanError> {
let mut plan = LogicalPlan::Dummy;
let mut is_sorted = false;

if let Some(table_ref) = stmt.from_table.get(0) {
plan = self.plan_table_ref(table_ref, false)?;
// use `sorted` mode from the storage engine if the order by column is the primary key
if stmt.orderby.len() == 1 && !stmt.orderby[0].descending {
if let BoundExprKind::ColumnRef(col_ref) = &stmt.orderby[0].expr.kind {
if col_ref.is_primary_key {
is_sorted = true;
}
}
}
plan = self.plan_table_ref(table_ref, false, is_sorted)?;
}

if let Some(expr) = stmt.where_clause {
Expand All @@ -32,7 +42,7 @@ impl LogicalPlaner {
child: Box::new(plan),
});
}
if !stmt.orderby.is_empty() {
if !stmt.orderby.is_empty() && !is_sorted {
plan = LogicalPlan::Order(LogicalOrder {
comparators: stmt.orderby,
child: Box::new(plan),
Expand Down Expand Up @@ -85,6 +95,7 @@ impl LogicalPlaner {
&self,
table_ref: &BoundTableRef,
with_row_handler: bool,
is_sorted: bool,
) -> Result<LogicalPlan, LogicalPlanError> {
match table_ref {
BoundTableRef::BaseTableRef {
Expand All @@ -95,15 +106,17 @@ impl LogicalPlaner {
table_ref_id: *ref_id,
column_ids: column_ids.to_vec(),
with_row_handler,
is_sorted,
})),
BoundTableRef::JoinTableRef {
relation,
join_tables,
} => {
let relation_plan = self.plan_table_ref(relation, with_row_handler)?;
let relation_plan = self.plan_table_ref(relation, with_row_handler, is_sorted)?;
let mut join_table_plans = vec![];
for table in join_tables.iter() {
let table_plan = self.plan_table_ref(&table.table_ref, with_row_handler)?;
let table_plan =
self.plan_table_ref(&table.table_ref, with_row_handler, is_sorted)?;
join_table_plans.push(LogicalJoinTable {
table_plan: Box::new(table_plan),
join_op: table.join_op.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/logical_planner/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pub struct LogicalSeqScan {
pub table_ref_id: TableRefId,
pub column_ids: Vec<ColumnId>,
pub with_row_handler: bool,
pub is_sorted: bool,
}
7 changes: 5 additions & 2 deletions src/physical_planner/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct PhysicalSeqScan {
pub table_ref_id: TableRefId,
pub column_ids: Vec<ColumnId>,
pub with_row_handler: bool,
pub is_sorted: bool,
}

impl PhysicalPlaner {
Expand All @@ -19,6 +20,7 @@ impl PhysicalPlaner {
table_ref_id: plan.table_ref_id,
column_ids: plan.column_ids,
with_row_handler: plan.with_row_handler,
is_sorted: plan.is_sorted,
}))
}
}
Expand All @@ -27,10 +29,11 @@ impl PlanExplainable for PhysicalSeqScan {
fn explain_inner(&self, _level: usize, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"SeqScan: table #{}, columns [{}], with_row_handler: {}",
"SeqScan: table #{}, columns [{}], with_row_handler: {}, is_sorted: {}",
self.table_ref_id.table_id,
self.column_ids.iter().map(ToString::to_string).join(", "),
self.with_row_handler
self.with_row_handler,
self.is_sorted
)
}
}
6 changes: 6 additions & 0 deletions src/storage/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub(super) struct InMemoryTableInner {
chunks: Vec<DataChunkRef>,
deleted_rows: HashSet<usize>,
columns: HashMap<ColumnId, ColumnDesc>,
column_infos: Arc<[ColumnCatalog]>,
}

pub(super) type InMemoryTableInnerRef = Arc<RwLock<InMemoryTableInner>>;
Expand All @@ -33,6 +34,7 @@ impl InMemoryTableInner {
.map(|col| (col.id(), col.desc().clone()))
.collect(),
deleted_rows: HashSet::new(),
column_infos: columns.into(),
}
}

Expand Down Expand Up @@ -66,6 +68,10 @@ impl InMemoryTableInner {
})
.try_collect()
}

pub fn get_column_infos(&self) -> Arc<[ColumnCatalog]> {
self.column_infos.clone()
}
}

impl InMemoryTable {
Expand Down
60 changes: 57 additions & 3 deletions src/storage/memory/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ use std::sync::Arc;

use super::table::InMemoryTableInnerRef;
use super::{InMemoryRowHandler, InMemoryTable, InMemoryTxnIterator};
use crate::array::{DataChunk, DataChunkRef};
use crate::array::{
ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk, DataChunkRef,
};
use crate::catalog::{find_sort_key_id, ColumnCatalog};
use crate::storage::{StorageColumnRef, StorageResult, Transaction};
use async_trait::async_trait;
use itertools::Itertools;

/// A transaction running on `InMemoryStorage`.
pub struct InMemoryTransaction {
Expand All @@ -29,6 +33,9 @@ pub struct InMemoryTransaction {

/// Snapshot of all deleted rows
deleted_rows: Arc<HashSet<usize>>,

/// All information about columns
column_infos: Arc<[ColumnCatalog]>,
}

impl InMemoryTransaction {
Expand All @@ -41,10 +48,52 @@ impl InMemoryTransaction {
table: table.inner.clone(),
snapshot: Arc::new(inner.get_all_chunks()),
deleted_rows: Arc::new(inner.get_all_deleted_rows()),
column_infos: inner.get_column_infos(),
})
}
}

/// If primary key is found in [`ColumnCatalog`], sort all in-memory data using that key.
fn sort_datachunk_by_pk(
chunks: &Arc<Vec<Arc<DataChunk>>>,
column_infos: &[ColumnCatalog],
) -> Arc<Vec<Arc<DataChunk>>> {
if let Some(sort_key_id) = find_sort_key_id(column_infos) {
if chunks.is_empty() {
return chunks.clone();
}
let mut builders = chunks[0]
.arrays()
.iter()
.map(ArrayBuilderImpl::from_type_of_array)
.collect_vec();

for chunk in &**chunks {
for (array, builder) in chunk.arrays().iter().zip(builders.iter_mut()) {
builder.append(array);
}
}

let arrays = builders
.into_iter()
.map(|builder| builder.finish())
.collect_vec();
let sorted_index = arrays[sort_key_id].get_sorted_indices();

let chunk = arrays
.into_iter()
.map(|array| {
let mut builder = ArrayBuilderImpl::from_type_of_array(&array);
builder.pick_from(&array, &sorted_index);
builder.finish()
})
.collect::<DataChunk>();
Arc::new(vec![Arc::new(chunk)])
} else {
chunks.clone()
}
}

#[async_trait]
impl Transaction for InMemoryTransaction {
type TxnIteratorType = InMemoryTxnIterator;
Expand All @@ -68,10 +117,15 @@ impl Transaction for InMemoryTransaction {
"sort_key is not supported in InMemoryEngine for now"
);
assert!(!reversed, "reverse iterator is not supported for now");
assert!(!is_sorted, "sorted iterator is not supported for now");

let snapshot = if is_sorted {
sort_datachunk_by_pk(&self.snapshot, &self.column_infos)
} else {
self.snapshot.clone()
};

Ok(InMemoryTxnIterator::new(
self.snapshot.clone(),
snapshot,
self.deleted_rows.clone(),
col_idx,
))
Expand Down
3 changes: 1 addition & 2 deletions src/storage/secondary/rowset/mem_rowset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::path::Path;
use std::sync::Arc;

use crate::array::{ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk};
use crate::catalog::ColumnCatalog;
use crate::catalog::{find_sort_key_id, ColumnCatalog};
use crate::storage::StorageResult;
use itertools::Itertools;

use super::find_sort_key_id;
use super::rowset_builder::RowsetBuilder;
use crate::storage::secondary::ColumnBuilderOptions;

Expand Down
15 changes: 0 additions & 15 deletions src/storage/secondary/rowset/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,3 @@ mod disk_rowset;
pub use disk_rowset::*;
mod rowset_iterator;
pub use rowset_iterator::*;

use crate::catalog::ColumnCatalog;

pub fn find_sort_key_id(column_infos: &[ColumnCatalog]) -> Option<usize> {
let mut key = None;
for (id, column_info) in column_infos.iter().enumerate() {
if column_info.is_primary() {
if key.is_some() {
panic!("only one primary key is supported");
}
key = Some(id);
}
}
key
}