Skip to content

Commit

Permalink
Take review comments into account.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Dec 5, 2023
1 parent 401d2d8 commit 8e193aa
Show file tree
Hide file tree
Showing 31 changed files with 528 additions and 230 deletions.
64 changes: 43 additions & 21 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, SplitState, StageSplitsRequestExt,
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt,
SplitMetadata, SplitState, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest,
Expand Down Expand Up @@ -248,15 +249,18 @@ async fn test_ingest_docs_cli() {

local_ingest_docs_cli(args).await.unwrap();

let splits: Vec<_> = test_env
let splits_metadata: Vec<SplitMetadata> = test_env
.metastore()
.await
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.collect_splits_metadata()
.await
.unwrap();

assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_metadata.num_docs, 5);
assert_eq!(splits_metadata.len(), 1);
assert_eq!(splits_metadata[0].num_docs, 5);

// Ensure cache directory is empty.
let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path);
Expand Down Expand Up @@ -662,11 +666,14 @@ async fn test_garbage_collect_cli_no_grace() {
dry_run,
};

let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

let args = create_gc_args(false);

Expand All @@ -676,7 +683,7 @@ async fn test_garbage_collect_cli_no_grace() {
let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
assert_eq!(index_path.try_exists().unwrap(), true);

let split_ids = vec![splits[0].split_id().to_string()];
let split_ids = vec![splits_metadata[0].split_id().to_string()];
let mut metastore = refresh_metastore(metastore).await.unwrap();
let mark_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids.clone());
Expand Down Expand Up @@ -713,6 +720,9 @@ async fn test_garbage_collect_cli_no_grace() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.collect_splits_metadata()
.await
.unwrap()
.len(),
0
);
Expand Down Expand Up @@ -769,14 +779,17 @@ async fn test_garbage_collect_index_cli() {
.await
.unwrap();

let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
let split_filename = quickwit_common::split_file(splits[0].split_metadata.split_id.as_str());
let split_filename = quickwit_common::split_file(splits_metadata[0].split_id.as_str());
let split_path = index_path.join(&split_filename);
assert_eq!(split_path.try_exists().unwrap(), true);

Expand All @@ -786,39 +799,39 @@ async fn test_garbage_collect_index_cli() {

// Split should still exists within grace period.
let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

// The following steps help turn an existing published split into a staged one
// without deleting the files.
let split = splits[0].clone();
let split_metadata = splits_metadata[0].clone();
metastore
.mark_splits_for_deletion(MarkSplitsForDeletionRequest::new(
index_uid.clone(),
vec![split.split_metadata.split_id.to_string()],
vec![split_metadata.clone().split_id.to_string()],
))
.await
.unwrap();
metastore
.delete_splits(DeleteSplitsRequest {
index_uid: index_uid.to_string(),
split_ids: splits
split_ids: splits_metadata
.into_iter()
.map(|split| split.split_metadata.split_id)
.map(|split_metadata| split_metadata.split_id)
.collect(),
})
.await
.unwrap();
metastore
.stage_splits(
StageSplitsRequest::try_from_split_metadata(
index_uid.clone(),
split.split_metadata.clone(),
)
.unwrap(),
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), split_metadata.clone())
.unwrap(),
)
.await
.unwrap();
Expand All @@ -828,6 +841,9 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.collect_splits()
.await
.unwrap();
assert_eq!(splits[0].split_state, SplitState::Staged);

Expand All @@ -841,6 +857,9 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.collect_splits()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_state, SplitState::Staged);
Expand All @@ -857,6 +876,9 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.collect_splits()
.await
.unwrap();
// Splits should be deleted from both metastore and file system.
assert_eq!(splits.len(), 0);
Expand Down
21 changes: 11 additions & 10 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@ where T: Send + 'static
}
}

impl<T> From<Vec<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(values: Vec<T>) -> Self {
Self {
inner: Box::pin(stream::iter(values)),
}
}
}

impl<T> fmt::Debug for ServiceStream<T>
where T: 'static
{
Expand Down Expand Up @@ -183,3 +173,14 @@ where T: Send + 'static
}
}
}

#[cfg(any(test, feature = "testsuite"))]
impl<T> From<Vec<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(values: Vec<T>) -> Self {
Self {
inner: Box::pin(stream::iter(values)),
}
}
}
Loading

0 comments on commit 8e193aa

Please sign in to comment.