Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream splits gRPC. #4109

Merged
merged 10 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ openssl = { version = "0.10.60", default-features = false }
openssl-probe = "0.1.5"
opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = "0.12.0"
ouroboros = "0.18.0"
pin-project = "1.1.0"
pnet = { version = "0.33.0", features = ["std"] }
postcard = { version = "1.0.4", features = [
Expand Down
65 changes: 35 additions & 30 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreResolver, MetastoreServiceExt,
SplitState, StageSplitsRequestExt,
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt,
SplitMetadata, SplitState, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest,
Expand Down Expand Up @@ -249,17 +249,18 @@ async fn test_ingest_docs_cli() {

local_ingest_docs_cli(args).await.unwrap();

let splits: Vec<_> = test_env
let splits_metadata: Vec<SplitMetadata> = test_env
.metastore()
.await
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();

assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_metadata.num_docs, 5);
assert_eq!(splits_metadata.len(), 1);
assert_eq!(splits_metadata[0].num_docs, 5);

// Ensure cache directory is empty.
let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path);
Expand Down Expand Up @@ -665,13 +666,14 @@ async fn test_garbage_collect_cli_no_grace() {
dry_run,
};

let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

let args = create_gc_args(false);

Expand All @@ -681,7 +683,7 @@ async fn test_garbage_collect_cli_no_grace() {
let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
assert_eq!(index_path.try_exists().unwrap(), true);

let split_ids = vec![splits[0].split_id().to_string()];
let split_ids = vec![splits_metadata[0].split_id().to_string()];
let mut metastore = refresh_metastore(metastore).await.unwrap();
let mark_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids.clone());
Expand Down Expand Up @@ -718,7 +720,8 @@ async fn test_garbage_collect_cli_no_grace() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap()
.len(),
0
Expand Down Expand Up @@ -776,16 +779,17 @@ async fn test_garbage_collect_index_cli() {
.await
.unwrap();

let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
let split_filename = quickwit_common::split_file(splits[0].split_metadata.split_id.as_str());
let split_filename = quickwit_common::split_file(splits_metadata[0].split_id.as_str());
let split_path = index_path.join(&split_filename);
assert_eq!(split_path.try_exists().unwrap(), true);

Expand All @@ -795,41 +799,39 @@ async fn test_garbage_collect_index_cli() {

// Split should still exists within grace period.
let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

// The following steps help turn an existing published split into a staged one
// without deleting the files.
let split = splits[0].clone();
let split_metadata = splits_metadata[0].clone();
metastore
.mark_splits_for_deletion(MarkSplitsForDeletionRequest::new(
index_uid.clone(),
vec![split.split_metadata.split_id.to_string()],
vec![split_metadata.split_id.to_string()],
))
.await
.unwrap();
metastore
.delete_splits(DeleteSplitsRequest {
index_uid: index_uid.to_string(),
split_ids: splits
split_ids: splits_metadata
.into_iter()
.map(|split| split.split_metadata.split_id)
.map(|split_metadata| split_metadata.split_id)
.collect(),
})
.await
.unwrap();
metastore
.stage_splits(
StageSplitsRequest::try_from_split_metadata(
index_uid.clone(),
split.split_metadata.clone(),
)
.unwrap(),
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), split_metadata.clone())
.unwrap(),
)
.await
.unwrap();
Expand All @@ -840,7 +842,8 @@ async fn test_garbage_collect_index_cli() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits()
.await
.unwrap();
assert_eq!(splits[0].split_state, SplitState::Staged);

Expand All @@ -855,7 +858,8 @@ async fn test_garbage_collect_index_cli() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_state, SplitState::Staged);
Expand All @@ -873,7 +877,8 @@ async fn test_garbage_collect_index_cli() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits()
.await
.unwrap();
// Splits should be deleted from both metastore and file system.
assert_eq!(splits.len(), 0);
Expand Down
25 changes: 25 additions & 0 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ pub struct ServiceStream<T> {
inner: BoxStream<T>,
}

impl<T> ServiceStream<T>
where T: Send + 'static
{
pub fn new(inner: BoxStream<T>) -> Self {
Self { inner }
}

pub fn empty() -> Self {
Self {
inner: Box::pin(stream::empty()),
}
}
}

impl<T> fmt::Debug for ServiceStream<T>
where T: 'static
{
Expand Down Expand Up @@ -159,3 +173,14 @@ where T: Send + 'static
}
}
}

#[cfg(any(test, feature = "testsuite"))]
impl<T> From<Vec<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(values: Vec<T>) -> Self {
Self {
inner: Box::pin(stream::iter(values)),
}
}
}