Skip to content

Commit

Permalink
feat(storage): add BTreeMap for row-format memory table (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
zehaowei committed Nov 19, 2021
1 parent 21b3417 commit dbd0120
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
*.db
.idea
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sqlparser = {version = "0.10", features = ["serde"]}
thiserror = "1.0"
tokio = {version = "1", features = ["full"]}
async-channel = "1"
btreemultimap = "0.1.0"

[dev-dependencies]
criterion = {version = "0.3", features = ["async_tokio"]}
Expand Down
169 changes: 140 additions & 29 deletions src/storage/secondary/rowset/mem_rowset.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,138 @@
use std::cmp::Ordering;
use std::path::Path;
use std::sync::Arc;

use crate::array::{ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk};
use crate::array::{ArrayBuilderImpl, DataChunk};
use crate::catalog::{find_sort_key_id, ColumnCatalog};
use crate::storage::StorageResult;
use crate::types::{DataValue, Row};
use itertools::Itertools;

use super::rowset_builder::RowsetBuilder;
use crate::storage::secondary::ColumnBuilderOptions;
use btreemultimap::BTreeMultiMap;

pub struct SecondaryMemRowset {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ComparableDataValue(DataValue);

impl PartialOrd for ComparableDataValue {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.0.partial_cmp(&other.0)
}
}

impl Ord for ComparableDataValue {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
}
}

pub trait MemTable {
/// add data to memory table
fn append(&mut self, columns: DataChunk) -> StorageResult<()>;

/// flush data to DataChunk
fn flush(self) -> StorageResult<DataChunk>;
}

pub struct BTreeMapMemTable {
columns: Arc<[ColumnCatalog]>,
primary_key_idx: usize,
multi_btree_map: BTreeMultiMap<ComparableDataValue, Row>,
}

impl BTreeMapMemTable {
fn new(columns: Arc<[ColumnCatalog]>, primary_key_idx: usize) -> Self {
Self {
columns,
primary_key_idx,
multi_btree_map: BTreeMultiMap::new(),
}
}
}

impl MemTable for BTreeMapMemTable {
fn append(&mut self, columns: DataChunk) -> StorageResult<()> {
for row_idx in 0..columns.cardinality() {
self.multi_btree_map.insert(
ComparableDataValue(columns.array_at(self.primary_key_idx).get(row_idx)),
columns.get_row_by_idx(row_idx),
);
}
Ok(())
}

/// flush row-format data ordered by primary key to DataChunk
fn flush(self) -> StorageResult<DataChunk> {
let mut builders = self
.columns
.iter()
.map(|column| ArrayBuilderImpl::new(column.desc().datatype()))
.collect_vec();
for (_, row_vec) in self.multi_btree_map.into_iter() {
for row in row_vec.into_iter() {
for idx in 0..self.columns.len() {
builders[idx].push(&row[idx]);
}
}
}
Ok(builders
.into_iter()
.map(|builder| builder.finish())
.collect::<DataChunk>())
}
}

pub struct ColumnMemTable {
builders: Vec<ArrayBuilderImpl>,
}

impl SecondaryMemRowset {
impl ColumnMemTable {
pub fn new(columns: Arc<[ColumnCatalog]>) -> Self {
Self {
builders: columns
.iter()
.map(|column| ArrayBuilderImpl::new(column.desc().datatype()))
.collect_vec(),
columns,
}
}
}

/// Add data to mem table.
pub async fn append(&mut self, columns: DataChunk) -> StorageResult<()> {
impl MemTable for ColumnMemTable {
fn append(&mut self, columns: DataChunk) -> StorageResult<()> {
for idx in 0..columns.column_count() {
self.builders[idx].append(columns.array_at(idx));
}
Ok(())
}

/// Flush memtable to disk and return a handler
fn flush(self) -> StorageResult<DataChunk> {
Ok(self
.builders
.into_iter()
.map(|builder| builder.finish())
.collect::<DataChunk>())
}
}

pub struct SecondaryMemRowset<M: MemTable> {
columns: Arc<[ColumnCatalog]>,
mem_table: M,
}

impl<M: MemTable> SecondaryMemRowset<M> {
/// Add data to memory table.
pub async fn append(&mut self, columns: DataChunk) -> StorageResult<()> {
self.mem_table.append(columns)
}

/// Flush memory table to disk and return a handler
pub async fn flush(
self,
directory: impl AsRef<Path>,
column_options: ColumnBuilderOptions,
) -> StorageResult<()> {
let chunk = if let Some(sort_key_idx) = find_sort_key_id(&*self.columns) {
let arrays = self
.builders
.into_iter()
.map(|builder| builder.finish())
.collect_vec();
let sorted_index = arrays[sort_key_idx].get_sorted_indices();
arrays
.into_iter()
.map(|array| {
let mut builder = ArrayBuilderImpl::from_type_of_array(&array);
builder.pick_from(&array, &sorted_index);
builder.finish()
})
.collect::<DataChunk>()
} else {
self.builders
.into_iter()
.map(|builder| builder.finish())
.collect::<DataChunk>()
};

let chunk = self.mem_table.flush()?;
let directory = directory.as_ref().to_path_buf();
let mut builder = RowsetBuilder::new(self.columns, &directory, column_options);
builder.append(chunk);
Expand All @@ -69,3 +141,42 @@ impl SecondaryMemRowset {
Ok(())
}
}

pub enum SecondaryMemRowsetImpl {
BTree(SecondaryMemRowset<BTreeMapMemTable>),
Column(SecondaryMemRowset<ColumnMemTable>),
}

impl SecondaryMemRowsetImpl {
pub fn new(columns: Arc<[ColumnCatalog]>) -> Self {
if let Some(sort_key_idx) = find_sort_key_id(&columns) {
Self::BTree(SecondaryMemRowset::<BTreeMapMemTable> {
columns: columns.clone(),
mem_table: BTreeMapMemTable::new(columns, sort_key_idx),
})
} else {
Self::Column(SecondaryMemRowset::<ColumnMemTable> {
columns: columns.clone(),
mem_table: ColumnMemTable::new(columns),
})
}
}

pub async fn append(&mut self, columns: DataChunk) -> StorageResult<()> {
match self {
Self::BTree(btree_table) => btree_table.append(columns).await,
Self::Column(column_table) => column_table.append(columns).await,
}
}

pub async fn flush(
self,
directory: impl AsRef<Path>,
column_options: ColumnBuilderOptions,
) -> StorageResult<()> {
match self {
Self::BTree(btree_table) => btree_table.flush(directory, column_options).await,
Self::Column(column_table) => column_table.flush(directory, column_options).await,
}
}
}
6 changes: 3 additions & 3 deletions src/storage/secondary/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use super::version_manager::{Snapshot, VersionManager};
use super::{
AddDVEntry, AddRowSetEntry, ColumnBuilderOptions, ColumnSeekPosition, ConcatIterator,
DeleteVector, DiskRowset, EpochOp, MergeIterator, RowSetIterator, SecondaryMemRowset,
DeleteVector, DiskRowset, EpochOp, MergeIterator, RowSetIterator, SecondaryMemRowsetImpl,
SecondaryRowHandler, SecondaryTable, SecondaryTableTxnIterator, TransactionLock,
};
use crate::array::DataChunk;
Expand All @@ -22,7 +22,7 @@ pub struct SecondaryTransaction {
finished: bool,

/// Includes all to-be-committed data.
mem: Option<SecondaryMemRowset>,
mem: Option<SecondaryMemRowsetImpl>,

/// Includes all to-be-deleted rows
delete_buffer: Vec<SecondaryRowHandler>,
Expand Down Expand Up @@ -62,7 +62,7 @@ impl SecondaryTransaction {
let mem = if readonly {
None
} else {
Some(SecondaryMemRowset::new(table.columns.clone()))
Some(SecondaryMemRowsetImpl::new(table.columns.clone()))
};

Ok(Self {
Expand Down
3 changes: 3 additions & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,6 @@ pub enum ConvertError {
#[error("failed to cast {0} to type {1}")]
Cast(String, &'static str),
}

/// memory table row type
pub(crate) type Row = Vec<DataValue>;

0 comments on commit dbd0120

Please sign in to comment.