diff --git a/src/binder/expression/agg_call.rs b/src/binder/expression/agg_call.rs index 96b8198f..b8be58bf 100644 --- a/src/binder/expression/agg_call.rs +++ b/src/binder/expression/agg_call.rs @@ -82,26 +82,11 @@ impl Binder { ), "count" => { if args.is_empty() { - for ref_id in self.context.regular_tables.values() { - let table = self.catalog.get_table(ref_id).unwrap(); - if let Some(col) = table.get_column_by_id(0) { - let column_ref_id = ColumnRefId::from_table(*ref_id, col.id()); - self.record_regular_table_column( - &table.name(), - col.name(), - col.id(), - col.desc().clone(), - ); - let expr = BoundExpr::ColumnRef(BoundColumnRef { - table_name: table.name(), - column_ref_id, - is_primary_key: col.is_primary(), - desc: col.desc().clone(), - }); - args.push(expr); - break; - } - } + let first_index_column = BoundExpr::InputRef(BoundInputRef { + index: 0, + return_type: DataType::new(DataTypeKind::Int(None), false), + }); + args.push(first_index_column); ( AggKind::RowCount, Some(DataType::new(DataTypeKind::Int(None), false)), diff --git a/src/logical_planner/select.rs b/src/logical_planner/select.rs index 498be75d..d1e369b1 100644 --- a/src/logical_planner/select.rs +++ b/src/logical_planner/select.rs @@ -12,7 +12,7 @@ use itertools::Itertools; use super::*; use crate::binder::{ - BoundAggCall, BoundExpr, BoundInputRef, BoundOrderBy, BoundSelect, BoundTableRef, + AggKind, BoundAggCall, BoundExpr, BoundInputRef, BoundOrderBy, BoundSelect, BoundTableRef, }; use crate::optimizer::plan_nodes::{ Dummy, LogicalAggregate, LogicalFilter, LogicalJoin, LogicalLimit, LogicalOrder, @@ -23,6 +23,7 @@ impl LogicalPlaner { pub fn plan_select(&self, mut stmt: Box) -> Result { let mut plan: PlanRef = Arc::new(Dummy {}); let mut is_sorted = false; + let mut with_row_handler = false; if let Some(table_ref) = &stmt.from_table { // use `sorted` mode from the storage engine if the order by column is the primary key @@ -33,7 +34,26 @@ impl LogicalPlaner { } } } - plan = self.plan_table_ref(table_ref, false, is_sorted)?; + if let BoundTableRef::JoinTableRef { join_tables, .. } = table_ref { + if join_tables.is_empty() { + stmt.select_list.iter().for_each(|expr| match expr { + BoundExpr::AggCall(expr) => { + if expr.kind == AggKind::RowCount { + with_row_handler = true; + } + } + BoundExpr::ExprWithAlias(expr) => { + if let BoundExpr::AggCall(expr) = &*expr.expr { + if expr.kind == AggKind::RowCount { + with_row_handler = true; + } + } + } + _ => {} + }); + } + } + plan = self.plan_table_ref(table_ref, with_row_handler, is_sorted)?; } if let Some(expr) = stmt.where_clause { diff --git a/src/storage/secondary/column.rs b/src/storage/secondary/column.rs index 5fcce039..f47fab1c 100644 --- a/src/storage/secondary/column.rs +++ b/src/storage/secondary/column.rs @@ -15,7 +15,7 @@ mod column_iterator; mod concrete_column_iterator; mod primitive_column_builder; mod primitive_column_factory; -mod row_handler_sequencer; +mod row_handler_column_iterator; use std::future::Future; use std::io::{Read, Seek, SeekFrom}; @@ -28,7 +28,7 @@ pub use concrete_column_iterator::*; pub use primitive_column_builder::*; pub use primitive_column_factory::*; use risinglight_proto::rowset::BlockIndex; -pub use row_handler_sequencer::*; +pub use row_handler_column_iterator::*; mod char_column_factory; use std::os::unix::fs::FileExt; use std::sync::{Arc, Mutex}; diff --git a/src/storage/secondary/column/column_iterator.rs b/src/storage/secondary/column/column_iterator.rs index 1a7b6f59..a83defbf 100644 --- a/src/storage/secondary/column/column_iterator.rs +++ b/src/storage/secondary/column/column_iterator.rs @@ -3,7 +3,7 @@ use super::{ BlobColumnIterator, BoolColumnIterator, CharBlockIteratorFactory, CharColumnIterator, Column, ColumnIterator, DecimalColumnIterator, F64ColumnIterator, I32ColumnIterator, - PrimitiveBlockIteratorFactory, StorageResult, + PrimitiveBlockIteratorFactory, RowHandlerColumnIterator, StorageResult, }; use crate::array::{Array, ArrayImpl}; use crate::catalog::ColumnCatalog; @@ -20,6 +20,8 @@ pub enum ColumnIteratorImpl { Date(DateColumnIterator), Interval(IntervalColumnIterator), Blob(BlobColumnIterator), + /// Special for row handler and not correspond to any data type + RowHandler(RowHandlerColumnIterator), } impl ColumnIteratorImpl { @@ -89,6 +91,15 @@ impl ColumnIteratorImpl { Ok(iter) } + pub fn new_row_handler(rowset_id: u32, row_count: u32, start_pos: u32) -> StorageResult { + let iter = Self::RowHandler(RowHandlerColumnIterator::new( + rowset_id as usize, + row_count as usize, + start_pos as usize, + )); + Ok(iter) + } + fn erase_concrete_type( ret: Option<(u32, impl Array + Into)>, ) -> Option<(u32, ArrayImpl)> { @@ -108,6 +119,7 @@ impl ColumnIteratorImpl { Self::Date(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?), Self::Interval(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?), Self::Blob(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?), + Self::RowHandler(it) => Self::erase_concrete_type(it.next_batch(expected_size).await?), }; Ok(result) } @@ -122,6 +134,7 @@ impl ColumnIteratorImpl { Self::Date(it) => it.fetch_hint(), Self::Interval(it) => it.fetch_hint(), Self::Blob(it) => it.fetch_hint(), + Self::RowHandler(it) => it.fetch_hint(), } } @@ -135,6 +148,7 @@ impl ColumnIteratorImpl { Self::Date(it) => it.fetch_current_row_id(), Self::Interval(it) => it.fetch_current_row_id(), Self::Blob(it) => it.fetch_current_row_id(), + Self::RowHandler(it) => it.fetch_current_row_id(), } } @@ -148,6 +162,7 @@ impl ColumnIteratorImpl { Self::Date(it) => it.skip(cnt), Self::Interval(it) => it.skip(cnt), Self::Blob(it) => it.skip(cnt), + Self::RowHandler(it) => it.skip(cnt), } } } diff --git a/src/storage/secondary/column/row_handler_column_iterator.rs b/src/storage/secondary/column/row_handler_column_iterator.rs new file mode 100644 index 00000000..c41b91a7 --- /dev/null +++ b/src/storage/secondary/column/row_handler_column_iterator.rs @@ -0,0 +1,66 @@ +use std::cmp::min; + +use futures::Future; + +use super::ColumnIterator; +use crate::array::{ArrayBuilder, I64Array, I64ArrayBuilder}; +use crate::storage::secondary::SecondaryRowHandler; +use crate::storage::StorageResult; + +pub struct RowHandlerColumnIterator { + rowset_id: usize, + row_count: usize, + current_row_id: usize, +} + +impl RowHandlerColumnIterator { + pub fn new(rowset_id: usize, row_count: usize, first_row: usize) -> Self { + Self { + rowset_id, + row_count, + current_row_id: first_row, + } + } +} + +impl ColumnIterator for RowHandlerColumnIterator { + type NextFuture<'a> = impl Future>> + 'a; + + fn next_batch(&mut self, expected_size: Option) -> Self::NextFuture<'_> { + async move { + if self.current_row_id >= self.row_count { + return Ok(None); + } + + let mut remaining_cnt = self.row_count - self.current_row_id; + if let Some(expected_size) = expected_size { + assert!(expected_size > 0); + remaining_cnt = min(remaining_cnt, expected_size); + } + + let first_row_id = self.current_row_id as u32; + + let mut builder = I64ArrayBuilder::with_capacity(remaining_cnt); + for row_id in self.current_row_id..(self.current_row_id + remaining_cnt) { + let item = SecondaryRowHandler(self.rowset_id as u32, row_id as u32).as_i64(); + builder.push(Some(&item)); + } + let batch = builder.finish(); + + self.current_row_id += remaining_cnt; + Ok(Some((first_row_id, batch))) + } + } + + fn fetch_hint(&self) -> usize { + self.row_count - self.current_row_id + } + + fn fetch_current_row_id(&self) -> u32 { + self.current_row_id as u32 + } + + fn skip(&mut self, cnt: usize) { + self.current_row_id += cnt + } +} diff --git a/src/storage/secondary/column/row_handler_sequencer.rs b/src/storage/secondary/column/row_handler_sequencer.rs deleted file mode 100644 index fc805fda..00000000 --- a/src/storage/secondary/column/row_handler_sequencer.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0. - -use crate::array::{ArrayBuilder, I64Array, I64ArrayBuilder}; -use crate::storage::secondary::SecondaryRowHandler; - -/// Generates a sequence of row-ids -pub struct RowHandlerSequencer {} - -impl RowHandlerSequencer { - pub fn sequence(rowset_id: u32, begin_row_id: u32, length: u32) -> I64Array { - let mut builder = I64ArrayBuilder::with_capacity(length as usize); - for row_id in begin_row_id..(begin_row_id + length) { - let item = SecondaryRowHandler(rowset_id, row_id).as_i64(); - builder.push(Some(&item)); - } - builder.finish() - } -} diff --git a/src/storage/secondary/rowset/rowset_iterator.rs b/src/storage/secondary/rowset/rowset_iterator.rs index aeb991be..8456c02a 100644 --- a/src/storage/secondary/rowset/rowset_iterator.rs +++ b/src/storage/secondary/rowset/rowset_iterator.rs @@ -5,9 +5,7 @@ use std::sync::Arc; use bitvec::prelude::BitVec; use smallvec::smallvec; -use super::super::{ - ColumnIteratorImpl, ColumnSeekPosition, RowHandlerSequencer, SecondaryIteratorImpl, -}; +use super::super::{ColumnIteratorImpl, ColumnSeekPosition, SecondaryIteratorImpl}; use super::DiskRowset; use crate::array::{Array, ArrayImpl}; use crate::binder::BoundExpr; @@ -19,7 +17,6 @@ const ROWSET_MAX_OUTPUT: usize = 65536; /// Iterates on a `RowSet` pub struct RowSetIterator { - rowset: Arc, column_refs: Arc<[StorageColumnRef]>, dvs: Vec>, column_iterators: Vec>, @@ -52,16 +49,24 @@ impl RowSetIterator { panic!("more than 1 row handler column") } - if row_handler_count == column_refs.len() { - panic!("no user column") - } - let mut column_iterators: Vec> = vec![]; for column_ref in &*column_refs { // TODO: parallel seek match column_ref { - StorageColumnRef::RowHandler => column_iterators.push(None), + StorageColumnRef::RowHandler => { + let column = rowset.column(0); + let row_count = column + .index() + .indexes() + .iter() + .fold(0, |acc, index| acc + index.row_count); + column_iterators.push(Some(ColumnIteratorImpl::new_row_handler( + rowset.rowset_id(), + row_count, + start_row_id, + )?)) + } StorageColumnRef::Idx(idx) => column_iterators.push(Some( ColumnIteratorImpl::new( rowset.column(*idx as usize), @@ -86,7 +91,6 @@ impl RowSetIterator { }; Ok(Self { - rowset, column_refs, dvs, column_iterators, @@ -155,13 +159,8 @@ impl RowSetIterator { // All rows in this batch have been deleted, call `skip` // on every columns if visi.not_any() { - for (id, column_ref) in self.column_refs.iter().enumerate() { - match column_ref { - StorageColumnRef::RowHandler => continue, - StorageColumnRef::Idx(_) => { - self.column_iterators[id].as_mut().unwrap().skip(visi.len()); - } - } + for (id, _) in self.column_refs.iter().enumerate() { + self.column_iterators[id].as_mut().unwrap().skip(visi.len()); } return Ok((false, None)); } @@ -231,18 +230,11 @@ impl RowSetIterator { // No rows left from the filter scan, skip columns which are not // in filter conditions if filter_bitmap.not_any() { - for (id, column_ref) in self.column_refs.iter().enumerate() { - match column_ref { - StorageColumnRef::RowHandler => continue, - StorageColumnRef::Idx(_) => { - if arrays[id].is_none() { - self.column_iterators[id] - .as_mut() - .unwrap() - .skip(filter_bitmap.len()); - } - } - } + for (id, _) in self.column_refs.iter().enumerate() { + self.column_iterators[id] + .as_mut() + .unwrap() + .skip(filter_bitmap.len()); } return Ok((false, None)); } @@ -255,55 +247,34 @@ impl RowSetIterator { // indicate the visibility of its rows // TODO: Implement the skip interface for column_iterator and call it here. // For those already fetched columns, they also need to delete corrensponding blocks. - for (id, column_ref) in self.column_refs.iter().enumerate() { + for (id, _) in self.column_refs.iter().enumerate() { if filter_context.is_none() { // If no filter, the `arrays` should be initialized here // manually by push a `None` arrays.push(None); } - match column_ref { - StorageColumnRef::RowHandler => continue, - StorageColumnRef::Idx(_) => { - if arrays[id].is_none() { - if let Some((row_id, array)) = self.column_iterators[id] - .as_mut() - .unwrap() - .next_batch(Some(fetch_size)) - .await? - { - if let Some(x) = common_chunk_range { - if x != (row_id, array.len()) { - panic!("unmatched rowid from column iterator"); - } - } - common_chunk_range = Some((row_id, array.len())); - arrays[id] = Some(array); + if arrays[id].is_none() { + if let Some((row_id, array)) = self.column_iterators[id] + .as_mut() + .unwrap() + .next_batch(Some(fetch_size)) + .await? + { + if let Some(x) = common_chunk_range { + if x != (row_id, array.len()) { + panic!("unmatched rowid from column iterator"); } } + common_chunk_range = Some((row_id, array.len())); + arrays[id] = Some(array); } } } - let common_chunk_range = if let Some(common_chunk_range) = common_chunk_range { - common_chunk_range - } else { + if common_chunk_range.is_none() { return Ok((true, None)); }; - // Fill RowHandlers - for (id, column_ref) in self.column_refs.iter().enumerate() { - if matches!(column_ref, StorageColumnRef::RowHandler) { - arrays[id] = Some( - RowHandlerSequencer::sequence( - self.rowset.rowset_id(), - common_chunk_range.0, - common_chunk_range.1 as u32, - ) - .into(), - ); - } - } - Ok(( false, StorageChunk::construct( diff --git a/tests/sql/count.slt b/tests/sql/count.slt new file mode 100644 index 00000000..fc403b89 --- /dev/null +++ b/tests/sql/count.slt @@ -0,0 +1,23 @@ +statement ok +create table t(v int) + +statement ok +insert into t values (1), (2), (3), (4), (5), (6), (7), (8) + +query I +select count(*) from t +---- +8 + +query I +select count(*) as 'total' from t where v > 5 +---- +3 + +statement ok +delete from t where v = 7 + +query I +select count(*) from t where v > 5 +---- +2