Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): record first_key in block_index_builder #598

Merged
merged 3 commits into from
Apr 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/src/proto/rowset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ message BlockIndex {

// Statistics of the block.
repeated BlockStatistics stats = 7;

// If first_key is null
bool is_first_key_null = 8;
}

// An entry of a delete record.
Expand Down
4 changes: 3 additions & 1 deletion src/storage/secondary/block/block_index_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ impl BlockIndexBuilder {
column_data: &mut Vec<u8>,
block_data: &mut Vec<u8>,
stats: Vec<BlockStatistics>,
first_key: Option<Vec<u8>>,
) {
self.indexes.push(BlockIndex {
offset: column_data.len() as u64,
length: block_data.len() as u64 + BLOCK_HEADER_SIZE as u64,
first_rowid: self.last_row_count as u32,
row_count: (self.row_count - self.last_row_count) as u32,
/// TODO(chi): support sort key
first_key: "".into(),
is_first_key_null: first_key.is_none(),
first_key: first_key.unwrap_or_default(),
stats,
});

Expand Down
19 changes: 17 additions & 2 deletions src/storage/secondary/column/blob_column_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::super::{BlockBuilder, BlockIndexBuilder, PlainBlobBlockBuilder};
use super::{append_one_by_one, ColumnBuilder};
use crate::array::{Array, BlobArray};
use crate::storage::secondary::block::RleBlockBuilder;
use crate::storage::secondary::encode::BlobEncode;
use crate::storage::secondary::ColumnBuilderOptions;
use crate::types::BlobRef;

Expand All @@ -26,6 +27,9 @@ pub struct BlobColumnBuilder {

/// Block index builder
block_index_builder: BlockIndexBuilder,

/// First key
first_key: Option<Vec<u8>>,
}

impl BlobColumnBuilder {
Expand All @@ -35,6 +39,7 @@ impl BlobColumnBuilder {
block_index_builder: BlockIndexBuilder::new(options.clone()),
options,
current_builder: None,
first_key: None,
}
}

Expand All @@ -56,8 +61,13 @@ impl BlobColumnBuilder {
),
};

self.block_index_builder
.finish_block(block_type, &mut self.data, &mut block_data, stats);
self.block_index_builder.finish_block(
block_type,
&mut self.data,
&mut block_data,
stats,
self.first_key.clone(),
);
}
}

Expand All @@ -80,6 +90,11 @@ impl ColumnBuilder<BlobArray> for BlobColumnBuilder {
PlainBlobBlockBuilder::new(self.options.target_block_size - 16),
));
}
if let Some(to_be_appended) = iter.peek() {
if self.options.record_first_key {
self.first_key = to_be_appended.map(|x| x.to_byte_slice().to_vec());
}
}
}

let (row_count, should_finish) = match self.current_builder.as_mut().unwrap() {
Expand Down
19 changes: 17 additions & 2 deletions src/storage/secondary/column/char_column_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct CharColumnBuilder {

/// Width of the char column
char_width: Option<u64>,

/// First key
first_key: Option<Vec<u8>>,
}

impl CharColumnBuilder {
Expand All @@ -44,6 +47,7 @@ impl CharColumnBuilder {
current_builder: None,
nullable,
char_width,
first_key: None,
}
}

Expand Down Expand Up @@ -75,8 +79,13 @@ impl CharColumnBuilder {
),
};

self.block_index_builder
.finish_block(block_type, &mut self.data, &mut block_data, stats);
self.block_index_builder.finish_block(
block_type,
&mut self.data,
&mut block_data,
stats,
self.first_key.clone(),
);
}
}

Expand Down Expand Up @@ -130,6 +139,12 @@ impl ColumnBuilder<Utf8Array> for CharColumnBuilder {
nullable
),
}

if let Some(to_be_appended) = iter.peek() {
if self.options.record_first_key {
self.first_key = to_be_appended.map(|x| x.as_bytes().to_vec());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that once is_first_key_null is set to false, it will never be set back to true?

We can define self.first_key as Option<Vec<u8>>, so as to make the logic clearer.

if self.options.record_first_key {
   self.first_key = to_be_appended.map(|x| x.as_bytes().to_vec());
}

And we can modify finish_block to accept first_key: Option<Vec<u8>>. Inside the block builder, we match first_key. If null, set is_first_key_null to true. Otherwise, set first_key to the inner value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still some small issues. Thanks again for your contribution!

Thanks also for your review too! 😭

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that once is_first_key_null is set to false, it will never be set back to true?

We can define self.first_key as Option<Vec<u8>>, so as to make the logic clearer.

if self.options.record_first_key {
   self.first_key = to_be_appended.map(|x| x.as_bytes().to_vec());
}

And we can modify finish_block to accept first_key: Option<Vec<u8>>. Inside the block builder, we match first_key. If null, set is_first_key_null to true. Otherwise, set first_key to the inner value.

Maybe finish_block() will be like this?

    pub fn finish_block(
        &mut self,
        block_type: BlockType,
        column_data: &mut Vec<u8>,
        block_data: &mut Vec<u8>,
        stats: Vec<BlockStatistics>,
        first_key: &mut Option<Vec<u8>>,
    ) {
        self.indexes.push(BlockIndex {
            offset: column_data.len() as u64,
            length: block_data.len() as u64 + BLOCK_HEADER_SIZE as u64,
            first_rowid: self.last_row_count as u32,
            row_count: (self.row_count - self.last_row_count) as u32,
            /// TODO(chi): support sort key
            first_key: match first_key {
                Some(x) => x.drain(..).collect(),
                None => Default::default(),
            },
            stats,
            is_first_key_null: first_key.is_none(),
        });
        ...

Copy link
Member

@skyzh skyzh Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! And I’d prefer to have Option<Vec<u8>> as parameter instead of &mut. As block is typically large, this re-allocation is acceptable.

}
}

let (row_count, should_finish) = match self.current_builder.as_mut().unwrap() {
Expand Down
63 changes: 60 additions & 3 deletions src/storage/secondary/column/primitive_column_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub struct PrimitiveColumnBuilder<T: PrimitiveFixedWidthEncode> {

/// Block index builder
block_index_builder: BlockIndexBuilder,

/// First key
first_key: Option<Vec<u8>>,
}

impl<T: PrimitiveFixedWidthEncode> PrimitiveColumnBuilder<T> {
Expand All @@ -55,6 +58,7 @@ impl<T: PrimitiveFixedWidthEncode> PrimitiveColumnBuilder<T> {
options,
current_builder: None,
nullable,
first_key: None,
}
}

Expand Down Expand Up @@ -84,8 +88,13 @@ impl<T: PrimitiveFixedWidthEncode> PrimitiveColumnBuilder<T> {
),
};

self.block_index_builder
.finish_block(block_type, &mut self.data, &mut block_data, stats);
self.block_index_builder.finish_block(
block_type,
&mut self.data,
&mut block_data,
stats,
self.first_key.clone(),
);
}
}

Expand Down Expand Up @@ -121,7 +130,6 @@ pub fn append_one_by_one<'a, A: Array>(
impl<T: PrimitiveFixedWidthEncode> ColumnBuilder<T::ArrayType> for PrimitiveColumnBuilder<T> {
fn append(&mut self, array: &T::ArrayType) {
let mut iter = array.iter().peekable();

while iter.peek().is_some() {
if self.current_builder.is_none() {
match (self.nullable, self.options.is_rle) {
Expand Down Expand Up @@ -161,6 +169,16 @@ impl<T: PrimitiveFixedWidthEncode> ColumnBuilder<T::ArrayType> for PrimitiveColu
));
}
}

if let Some(to_be_appended) = iter.peek() {
if self.options.record_first_key {
self.first_key = to_be_appended.map(|x| {
let mut first_key = vec![];
x.encode(&mut first_key);
first_key
});
}
}
}

let (row_count, should_finish) = match self.current_builder.as_mut().unwrap() {
Expand Down Expand Up @@ -279,4 +297,43 @@ mod tests {
}
assert_eq!(builder.finish().0.len(), 2);
}

#[test]
fn test_i32_block_index_first_key() {
let item_each_block = (128 - 16) / 4;

// Test for first key
let mut builder =
I32ColumnBuilder::new(true, ColumnBuilderOptions::record_first_key_test());
for _ in 0..10 {
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(item_each_block),
));
}

let (index, _) = builder.finish();
assert_eq!(index.len(), 11);

let mut f2: &[u8];
for item in index {
f2 = &item.first_key;
let x: i32 = PrimitiveFixedWidthEncode::decode(&mut f2);
assert_eq!(x, 1);
}

// Test for null first key
let mut builder =
I32ColumnBuilder::new(true, ColumnBuilderOptions::record_first_key_test());
for _ in 0..10 {
builder.append(&I32Array::from_iter(
[None].iter().cycle().cloned().take(item_each_block),
));
}
let (index, _) = builder.finish();
assert_eq!(index.len(), 11);

for item in index {
assert!(item.is_first_key_null);
}
}
}
22 changes: 22 additions & 0 deletions src/storage/secondary/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub struct StorageOptions {

/// Whether using run-length encoding
pub is_rle: bool,

/// Whether record first_key of each block into block_index
pub record_first_key: bool,
}

impl StorageOptions {
Expand All @@ -70,6 +73,7 @@ impl StorageOptions {
},
checksum_type: ChecksumType::Crc32,
is_rle: false,
record_first_key: false,
}
}

Expand All @@ -82,6 +86,7 @@ impl StorageOptions {
io_backend: IOBackend::in_memory(),
checksum_type: ChecksumType::None,
is_rle: false,
record_first_key: false,
}
}
}
Expand All @@ -97,6 +102,9 @@ pub struct ColumnBuilderOptions {

/// Whether using run-length encoding
pub is_rle: bool,

/// Whether record first_key of each block
pub record_first_key: bool,
}

impl ColumnBuilderOptions {
Expand All @@ -105,6 +113,7 @@ impl ColumnBuilderOptions {
target_block_size: options.target_block_size,
checksum_type: options.checksum_type,
is_rle: options.is_rle,
record_first_key: options.record_first_key,
}
}

Expand All @@ -114,6 +123,7 @@ impl ColumnBuilderOptions {
target_block_size: 4096,
checksum_type: ChecksumType::Crc32,
is_rle: false,
record_first_key: false,
}
}

Expand All @@ -123,6 +133,7 @@ impl ColumnBuilderOptions {
target_block_size: 128,
checksum_type: ChecksumType::None,
is_rle: false,
record_first_key: false,
}
}

Expand All @@ -132,6 +143,17 @@ impl ColumnBuilderOptions {
target_block_size: 128,
checksum_type: ChecksumType::None,
is_rle: true,
record_first_key: false,
}
}

#[cfg(test)]
pub fn record_first_key_test() -> Self {
Self {
target_block_size: 128,
checksum_type: ChecksumType::None,
is_rle: false,
record_first_key: true,
}
}
}