Skip to content

Commit

Permalink
Add stream splits gRPC.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Nov 9, 2023
1 parent 83c2cf5 commit 3731e99
Show file tree
Hide file tree
Showing 31 changed files with 489 additions and 461 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ openssl = { version = "0.10.55", default-features = false }
openssl-probe = "0.1.5"
opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = "0.12.0"
ouroboros = "0.18.0"
pin-project = "1.1.0"
pnet = { version = "0.33.0", features = ["std"] }
postcard = { version = "1.0.4", features = ["use-std"], default-features = false}
Expand Down
19 changes: 1 addition & 18 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreResolver, MetastoreServiceExt,
SplitState, StageSplitsRequestExt,
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, SplitState, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest,
Expand Down Expand Up @@ -254,8 +253,6 @@ async fn test_ingest_docs_cli() {
.await
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();

assert_eq!(splits.len(), 1);
Expand Down Expand Up @@ -668,8 +665,6 @@ async fn test_garbage_collect_cli_no_grace() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);

Expand Down Expand Up @@ -718,8 +713,6 @@ async fn test_garbage_collect_cli_no_grace() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
0
);
Expand Down Expand Up @@ -779,8 +772,6 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);

Expand All @@ -798,8 +789,6 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);

Expand Down Expand Up @@ -839,8 +828,6 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits[0].split_state, SplitState::Staged);

Expand All @@ -854,8 +841,6 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_state, SplitState::Staged);
Expand All @@ -872,8 +857,6 @@ async fn test_garbage_collect_index_cli() {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
// Splits should be deleted from both metastore and file system.
assert_eq!(splits.len(), 0);
Expand Down
32 changes: 32 additions & 0 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ pub struct ServiceStream<T> {
inner: BoxStream<T>,
}

impl<T> ServiceStream<T>
where T: Send + 'static
{
pub fn new(inner: BoxStream<T>) -> Self {
Self { inner }
}

pub fn empty() -> Self {
Self {
inner: Box::pin(stream::empty()),
}
}
}

impl<T> From<Vec<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(values: Vec<T>) -> Self {
Self {
inner: Box::pin(stream::iter(values)),
}
}
}

impl<T> fmt::Debug for ServiceStream<T>
where T: 'static
{
Expand Down Expand Up @@ -104,6 +128,14 @@ where T: Send + 'static
}
}

impl<T> From<BoxStream<T>> for ServiceStream<T>
where T: 'static
{
fn from(stream: BoxStream<T>) -> Self {
Self { inner: stream }
}
}

/// Adapts a server-side tonic::Streaming into a ServiceStream of `Result<T, tonic::Status>`. Once
/// an error is encountered, the stream will be closed and subsequent calls to `poll_next` will
/// return `None`.
Expand Down
41 changes: 12 additions & 29 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use futures::Future;
use quickwit_common::{PrettySample, Progress};
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitInfo, SplitMetadata,
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitInfo, SplitMetadata,
SplitState,
};
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -106,7 +106,9 @@ pub async fn run_garbage_collect(
metastore.list_splits(list_deletable_staged_request),
)
.await?
.deserialize_splits_metadata()?;
.into_iter()
.map(|split| split.split_metadata)
.collect();

if dry_run {
let marked_for_deletion_query = ListSplitsQuery::for_index(index_uid.clone())
Expand All @@ -118,7 +120,9 @@ pub async fn run_garbage_collect(
metastore.list_splits(marked_for_deletion_request),
)
.await?
.deserialize_splits_metadata()?;
.into_iter()
.map(|split| split.split_metadata)
.collect();
splits_marked_for_deletion.extend(deletable_staged_splits);

let candidate_entries: Vec<SplitInfo> = splits_marked_for_deletion
Expand Down Expand Up @@ -192,9 +196,7 @@ async fn delete_splits_marked_for_deletion(
}
};
let list_splits_result =
protect_future(progress_opt, metastore.list_splits(list_splits_request))
.await
.and_then(|list_splits_response| list_splits_response.deserialize_splits());
protect_future(progress_opt, metastore.list_splits(list_splits_request)).await;

let splits_to_delete: Vec<SplitMetadata> = match list_splits_result {
Ok(splits) => splits
Expand Down Expand Up @@ -345,14 +347,13 @@ mod tests {
use std::time::Duration;

use itertools::Itertools;
use quickwit_common::ServiceStream;
use quickwit_config::IndexConfig;
use quickwit_metastore::{
metastore_for_test, CreateIndexRequestExt, ListSplitsQuery, SplitMetadata, SplitState,
StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
CreateIndexRequest, EntityKind, ListSplitsResponse, StageSplitsRequest,
};
use quickwit_proto::metastore::{CreateIndexRequest, EntityKind, StageSplitsRequest};
use quickwit_proto::types::IndexUid;
use quickwit_storage::{
storage_for_test, BulkDeleteError, DeleteFailure, MockStorage, PutPayload,
Expand Down Expand Up @@ -395,8 +396,6 @@ mod tests {
.list_splits(list_splits_request)
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
1
);
Expand All @@ -422,8 +421,6 @@ mod tests {
.list_splits(list_splits_request)
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
1
);
Expand All @@ -449,8 +446,6 @@ mod tests {
.list_splits(list_splits_request)
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
1
);
Expand Down Expand Up @@ -496,8 +491,6 @@ mod tests {
.list_splits(list_splits_request)
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
1
);
Expand All @@ -523,8 +516,6 @@ mod tests {
.list_splits(list_splits_request)
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
1
);
Expand All @@ -549,8 +540,6 @@ mod tests {
.list_splits(list_splits_request)
.await
.unwrap()
.deserialize_splits()
.unwrap()
.len(),
0
);
Expand All @@ -562,9 +551,9 @@ mod tests {
let storage = storage_for_test();
let mut metastore = MetastoreServiceClient::mock();
metastore
.expect_list_splits()
.expect_stream_splits()
.times(2)
.returning(|_| Ok(ListSplitsResponse::empty()));
.returning(|_| Ok(ServiceStream::empty()));
run_garbage_collect(
IndexUid::new_with_random_ulid("index-test-gc-deletes"),
storage.clone(),
Expand Down Expand Up @@ -620,8 +609,6 @@ mod tests {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);

Expand All @@ -646,8 +633,6 @@ mod tests {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap()
.is_empty());
}

Expand Down Expand Up @@ -738,8 +723,6 @@ mod tests {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_id(), split_id_1);
Expand Down
25 changes: 14 additions & 11 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitInfo, SplitMetadata,
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitInfo, SplitMetadata,
SplitState,
};
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -174,25 +174,26 @@ impl IndexService {

if dry_run {
let list_splits_request = ListSplitsRequest::try_from_index_uid(index_uid)?;
let splits_to_delete = self
let splits_to_delete: Vec<SplitInfo> = self
.metastore
.list_splits(list_splits_request)
.await?
.deserialize_splits()?
.into_iter()
.map(|split| split.split_metadata.as_split_info())
.collect::<Vec<_>>();
.collect();
return Ok(splits_to_delete);
}
// Schedule staged and published splits for deletion.
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_states([SplitState::Staged, SplitState::Published]);
let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?;
let split_ids = self
let split_ids: Vec<SplitId> = self
.metastore
.list_splits(list_splits_request)
.await?
.deserialize_split_ids()?;
.into_iter()
.map(|split| split.split_metadata.split_id)
.collect();
let mark_splits_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids);
self.metastore
Expand All @@ -203,11 +204,13 @@ impl IndexService {
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_state(SplitState::MarkedForDeletion);
let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?;
let splits_to_delete = self
let splits_to_delete: Vec<SplitMetadata> = self
.metastore
.list_splits(list_splits_request)
.await?
.deserialize_splits_metadata()?;
.into_iter()
.map(|split| split.split_metadata)
.collect();

let deleted_splits = delete_splits_from_storage_and_metastore(
index_uid.clone(),
Expand Down Expand Up @@ -291,7 +294,9 @@ impl IndexService {
.metastore
.list_splits(list_splits_request)
.await?
.deserialize_splits_metadata()?;
.into_iter()
.map(|split| split.split_metadata)
.collect();
let split_ids: Vec<SplitId> = splits_metadata
.iter()
.map(|split| split.split_id.to_string())
Expand Down Expand Up @@ -496,8 +501,6 @@ mod tests {
let splits = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.unwrap();
assert_eq!(splits.len(), 1);

Expand Down

0 comments on commit 3731e99

Please sign in to comment.