Skip to content

Commit

Permalink
feat: add push_with_name_n
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
  • Loading branch information
zhongzc committed Dec 19, 2023
1 parent 7391686 commit 534774f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 22 deletions.
24 changes: 21 additions & 3 deletions src/index/src/inverted_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ pub trait InvertedIndexCreator {
/// - `index_name`: Identifier for the index being built
/// - `value`: The data to be indexed, or `None` for a null entry
///
/// Note: Caller should call this method for each row in the dataset
async fn push_with_name(&mut self, index_name: &str, value: Option<BytesRef<'_>>)
-> Result<()>;
/// It should be equivalent to calling `push_with_name_n` with `n = 1`
async fn push_with_name(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
) -> Result<()> {
self.push_with_name_n(index_name, value, 1).await
}

/// Adds `n` identical values to the named index. `None` values represent absence of data (null)
///
/// - `index_name`: Identifier for the index being built
/// - `value`: The data to be indexed, or `None` for a null entry
///
/// It should be equivalent to calling `push_with_name` `n` times
async fn push_with_name_n(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
n: usize,
) -> Result<()>;

/// Finalizes the index creation process, ensuring all data is properly indexed and stored
/// in the provided writer
Expand Down
60 changes: 47 additions & 13 deletions src/index/src/inverted_index/create/sort_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,23 @@ pub struct SortIndexCreator {

#[async_trait]
impl InvertedIndexCreator for SortIndexCreator {
/// Inserts a value or null into the sorter for the specified index
async fn push_with_name(
/// Inserts `n` values or nulls into the sorter for the specified index.
///
/// If the index does not exist, a new index is created even if `n` is 0.
/// Caller may leverage this behavior to create indexes with no data.
async fn push_with_name_n(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
n: usize,
) -> Result<()> {
match self.sorters.get_mut(index_name) {
Some(sorter) => sorter.push(value).await,
Some(sorter) => sorter.push_n(value, n).await,
None => {
let mut sorter =
(self.sorter_factory)(index_name.to_owned(), self.segment_row_count);
sorter.push(value).await?;
self.sorters.insert(index_name.to_owned(), sorter);
let index_name = index_name.to_string();
let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count);
sorter.push_n(value, n).await?;
self.sorters.insert(index_name, sorter);
Ok(())
}
}
Expand Down Expand Up @@ -118,12 +122,6 @@ mod tests {
use crate::inverted_index::format::writer::MockInvertedIndexWriter;
use crate::inverted_index::Bytes;

fn stream_to_values(stream: SortedStream) -> Vec<Bytes> {
futures::executor::block_on(async {
stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
})
}

#[tokio::test]
async fn test_sort_index_creator_basic() {
let mut creator =
Expand Down Expand Up @@ -209,6 +207,36 @@ mod tests {
assert!(matches!(res, Err(Error::InconsistentRowCount { .. })));
}

#[tokio::test]
async fn test_sort_index_creator_create_indexes_without_data() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());

creator.push_with_name_n("a", None, 0).await.unwrap();
creator.push_with_name_n("b", None, 0).await.unwrap();
creator.push_with_name_n("c", None, 0).await.unwrap();

let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer
.expect_add_index()
.returning(|name, null_bitmap, stream| {
assert!(null_bitmap.is_empty());
assert!(matches!(name.as_str(), "a" | "b" | "c"));
assert!(stream_to_values(stream).is_empty());
Ok(())
});
mock_writer
.expect_finish()
.times(1)
.returning(|total_row_count, segment_row_count| {
assert_eq!(total_row_count, 0);
assert_eq!(segment_row_count.get(), 1);
Ok(())
});

creator.finish(&mut mock_writer).await.unwrap();
}

fn set_bit(bit_vec: &mut BitVec, index: usize) {
if index >= bit_vec.len() {
bit_vec.resize(index + 1, false);
Expand Down Expand Up @@ -267,4 +295,10 @@ mod tests {
})
}
}

fn stream_to_values(stream: SortedStream) -> Vec<Bytes> {
futures::executor::block_on(async {
stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
})
}
}
4 changes: 3 additions & 1 deletion src/index/src/inverted_index/format/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::inverted_index::error::Result;
pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter;
use crate::inverted_index::Bytes;

pub type ValueStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;

/// Trait for writing inverted index data to underlying storage.
#[mockall::automock]
#[async_trait]
Expand All @@ -39,7 +41,7 @@ pub trait InvertedIndexWriter: Send {
&mut self,
name: String,
null_bitmap: BitVec,
values: Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>,
values: ValueStream,
) -> Result<()>;

/// Finalizes the index writing process, ensuring all data is written.
Expand Down
10 changes: 5 additions & 5 deletions src/index/src/inverted_index/format/writer/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ use std::num::NonZeroUsize;

use async_trait::async_trait;
use common_base::BitVec;
use futures::{AsyncWrite, AsyncWriteExt, Stream};
use futures::{AsyncWrite, AsyncWriteExt};
use greptime_proto::v1::index::InvertedIndexMetas;
use prost::Message;
use snafu::ResultExt;

use super::single::SingleIndexWriter;
use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::Bytes;
use crate::inverted_index::format::writer::single::SingleIndexWriter;
use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};

/// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages
/// writing of an inverted index to a blob storage.
Expand All @@ -45,7 +44,7 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWrit
&mut self,
name: String,
null_bitmap: BitVec,
values: Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>,
values: ValueStream,
) -> Result<()> {
let single_writer = SingleIndexWriter::new(
name.clone(),
Expand Down Expand Up @@ -105,6 +104,7 @@ mod tests {

use super::*;
use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use crate::inverted_index::Bytes;

fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)
Expand Down

0 comments on commit 534774f

Please sign in to comment.