Skip to content

Commit

Permalink
feat(storage): support scan row handler only (#447)
Browse files Browse the repository at this point in the history
Signed-off-by: Fedomn <fedomn.ma@gmail.com>
  • Loading branch information
Fedomn committed Feb 16, 2022
1 parent de55dfe commit 8464ae4
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 107 deletions.
25 changes: 5 additions & 20 deletions src/binder/expression/agg_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,11 @@ impl Binder {
),
"count" => {
if args.is_empty() {
for ref_id in self.context.regular_tables.values() {
let table = self.catalog.get_table(ref_id).unwrap();
if let Some(col) = table.get_column_by_id(0) {
let column_ref_id = ColumnRefId::from_table(*ref_id, col.id());
self.record_regular_table_column(
&table.name(),
col.name(),
col.id(),
col.desc().clone(),
);
let expr = BoundExpr::ColumnRef(BoundColumnRef {
table_name: table.name(),
column_ref_id,
is_primary_key: col.is_primary(),
desc: col.desc().clone(),
});
args.push(expr);
break;
}
}
let first_index_column = BoundExpr::InputRef(BoundInputRef {
index: 0,
return_type: DataType::new(DataTypeKind::Int(None), false),
});
args.push(first_index_column);
(
AggKind::RowCount,
Some(DataType::new(DataTypeKind::Int(None), false)),
Expand Down
24 changes: 22 additions & 2 deletions src/logical_planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use itertools::Itertools;

use super::*;
use crate::binder::{
BoundAggCall, BoundExpr, BoundInputRef, BoundOrderBy, BoundSelect, BoundTableRef,
AggKind, BoundAggCall, BoundExpr, BoundInputRef, BoundOrderBy, BoundSelect, BoundTableRef,
};
use crate::optimizer::plan_nodes::{
Dummy, LogicalAggregate, LogicalFilter, LogicalJoin, LogicalLimit, LogicalOrder,
Expand All @@ -23,6 +23,7 @@ impl LogicalPlaner {
pub fn plan_select(&self, mut stmt: Box<BoundSelect>) -> Result<PlanRef, LogicalPlanError> {
let mut plan: PlanRef = Arc::new(Dummy {});
let mut is_sorted = false;
let mut with_row_handler = false;

if let Some(table_ref) = &stmt.from_table {
// use `sorted` mode from the storage engine if the order by column is the primary key
Expand All @@ -33,7 +34,26 @@ impl LogicalPlaner {
}
}
}
plan = self.plan_table_ref(table_ref, false, is_sorted)?;
if let BoundTableRef::JoinTableRef { join_tables, .. } = table_ref {
if join_tables.is_empty() {
stmt.select_list.iter().for_each(|expr| match expr {
BoundExpr::AggCall(expr) => {
if expr.kind == AggKind::RowCount {
with_row_handler = true;
}
}
BoundExpr::ExprWithAlias(expr) => {
if let BoundExpr::AggCall(expr) = &*expr.expr {
if expr.kind == AggKind::RowCount {
with_row_handler = true;
}
}
}
_ => {}
});
}
}
plan = self.plan_table_ref(table_ref, with_row_handler, is_sorted)?;
}

if let Some(expr) = stmt.where_clause {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/secondary/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod column_iterator;
mod concrete_column_iterator;
mod primitive_column_builder;
mod primitive_column_factory;
mod row_handler_sequencer;
mod row_handler_column_iterator;

use std::future::Future;
use std::io::{Read, Seek, SeekFrom};
Expand All @@ -28,7 +28,7 @@ pub use concrete_column_iterator::*;
pub use primitive_column_builder::*;
pub use primitive_column_factory::*;
use risinglight_proto::rowset::BlockIndex;
pub use row_handler_sequencer::*;
pub use row_handler_column_iterator::*;
mod char_column_factory;
use std::os::unix::fs::FileExt;
use std::sync::{Arc, Mutex};
Expand Down
17 changes: 16 additions & 1 deletion src/storage/secondary/column/column_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{
BlobColumnIterator, BoolColumnIterator, CharBlockIteratorFactory, CharColumnIterator, Column,
ColumnIterator, DecimalColumnIterator, F64ColumnIterator, I32ColumnIterator,
PrimitiveBlockIteratorFactory, StorageResult,
PrimitiveBlockIteratorFactory, RowHandlerColumnIterator, StorageResult,
};
use crate::array::{Array, ArrayImpl};
use crate::catalog::ColumnCatalog;
Expand All @@ -20,6 +20,8 @@ pub enum ColumnIteratorImpl {
Date(DateColumnIterator),
Interval(IntervalColumnIterator),
Blob(BlobColumnIterator),
/// Special for row handler and not correspond to any data type
RowHandler(RowHandlerColumnIterator),
}

impl ColumnIteratorImpl {
Expand Down Expand Up @@ -89,6 +91,15 @@ impl ColumnIteratorImpl {
Ok(iter)
}

pub fn new_row_handler(rowset_id: u32, row_count: u32, start_pos: u32) -> StorageResult<Self> {
let iter = Self::RowHandler(RowHandlerColumnIterator::new(
rowset_id as usize,
row_count as usize,
start_pos as usize,
));
Ok(iter)
}

fn erase_concrete_type(
ret: Option<(u32, impl Array + Into<ArrayImpl>)>,
) -> Option<(u32, ArrayImpl)> {
Expand All @@ -108,6 +119,7 @@ impl ColumnIteratorImpl {
Self::Date(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?),
Self::Interval(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?),
Self::Blob(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?),
Self::RowHandler(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?),
};
Ok(result)
}
Expand All @@ -122,6 +134,7 @@ impl ColumnIteratorImpl {
Self::Date(it) => it.fetch_hint(),
Self::Interval(it) => it.fetch_hint(),
Self::Blob(it) => it.fetch_hint(),
Self::RowHandler(it) => it.fetch_hint(),
}
}

Expand All @@ -135,6 +148,7 @@ impl ColumnIteratorImpl {
Self::Date(it) => it.fetch_current_row_id(),
Self::Interval(it) => it.fetch_current_row_id(),
Self::Blob(it) => it.fetch_current_row_id(),
Self::RowHandler(it) => it.fetch_current_row_id(),
}
}

Expand All @@ -148,6 +162,7 @@ impl ColumnIteratorImpl {
Self::Date(it) => it.skip(cnt),
Self::Interval(it) => it.skip(cnt),
Self::Blob(it) => it.skip(cnt),
Self::RowHandler(it) => it.skip(cnt),
}
}
}
66 changes: 66 additions & 0 deletions src/storage/secondary/column/row_handler_column_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::cmp::min;

use futures::Future;

use super::ColumnIterator;
use crate::array::{ArrayBuilder, I64Array, I64ArrayBuilder};
use crate::storage::secondary::SecondaryRowHandler;
use crate::storage::StorageResult;

pub struct RowHandlerColumnIterator {
rowset_id: usize,
row_count: usize,
current_row_id: usize,
}

impl RowHandlerColumnIterator {
pub fn new(rowset_id: usize, row_count: usize, first_row: usize) -> Self {
Self {
rowset_id,
row_count,
current_row_id: first_row,
}
}
}

impl ColumnIterator<I64Array> for RowHandlerColumnIterator {
type NextFuture<'a> = impl Future<Output = StorageResult<Option<(u32, I64Array)>>> + 'a;

fn next_batch(&mut self, expected_size: Option<usize>) -> Self::NextFuture<'_> {
async move {
if self.current_row_id >= self.row_count {
return Ok(None);
}

let mut remaining_cnt = self.row_count - self.current_row_id;
if let Some(expected_size) = expected_size {
assert!(expected_size > 0);
remaining_cnt = min(remaining_cnt, expected_size);
}

let first_row_id = self.current_row_id as u32;

let mut builder = I64ArrayBuilder::with_capacity(remaining_cnt);
for row_id in self.current_row_id..(self.current_row_id + remaining_cnt) {
let item = SecondaryRowHandler(self.rowset_id as u32, row_id as u32).as_i64();
builder.push(Some(&item));
}
let batch = builder.finish();

self.current_row_id += remaining_cnt;
Ok(Some((first_row_id, batch)))
}
}

fn fetch_hint(&self) -> usize {
self.row_count - self.current_row_id
}

fn fetch_current_row_id(&self) -> u32 {
self.current_row_id as u32
}

fn skip(&mut self, cnt: usize) {
self.current_row_id += cnt
}
}
18 changes: 0 additions & 18 deletions src/storage/secondary/column/row_handler_sequencer.rs

This file was deleted.

Loading

0 comments on commit 8464ae4

Please sign in to comment.