Skip to content

Commit

Permalink
Populates the control plane model will closed and open shards (#4235)
Browse files Browse the repository at this point in the history
as opposed to only open ones.

Closes shards are not necessarily entirely indexed yet.
With this change, after a reload, the closed shards will be
scheduling for indexing as usual, and will get cleaned up once
they reach eof.

Closes #4176
  • Loading branch information
fulmicoton committed Dec 5, 2023
1 parent 187e2b7 commit 1e9ad73
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 8 deletions.
20 changes: 14 additions & 6 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Expand Up @@ -28,7 +28,7 @@ use quickwit_config::SourceConfig;
use quickwit_ingest::ShardInfos;
use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt};
use quickwit_proto::control_plane::ControlPlaneResult;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::{
self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError,
MetastoreService, MetastoreServiceClient, SourceType,
Expand All @@ -38,7 +38,7 @@ use serde::Serialize;
pub(super) use shard_table::{
NextShardId, ScalingMode, ShardEntry, ShardStats, ShardTable, ShardTableEntry,
};
use tracing::{info, warn};
use tracing::{info, instrument, warn};

/// The control plane maintains a model in sync with the metastore.
///
Expand Down Expand Up @@ -73,6 +73,7 @@ impl ControlPlaneModel {
}
}

#[instrument(skip_all)]
pub async fn load_from_metastore(
&mut self,
metastore: &mut MetastoreServiceClient,
Expand Down Expand Up @@ -108,7 +109,7 @@ impl ControlPlaneModel {
let request = ListShardsSubrequest {
index_uid: index_metadata.index_uid.clone().into(),
source_id: source_config.source_id.clone(),
shard_state: Some(ShardState::Open as i32),
shard_state: None,
};
subrequests.push(request);
}
Expand Down Expand Up @@ -318,7 +319,7 @@ impl ControlPlaneModel {
mod tests {
use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::Shard;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::ListIndexesMetadataResponse;

use super::*;
Expand Down Expand Up @@ -353,18 +354,25 @@ mod tests {

assert_eq!(request.subrequests[0].index_uid, "test-index-0:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].shard_state(), ShardState::Open);
assert_eq!(
request.subrequests[0].shard_state(),
ShardState::Unspecified
);

assert_eq!(request.subrequests[1].index_uid, "test-index-1:0");
assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[1].shard_state(), ShardState::Open);
assert_eq!(
request.subrequests[1].shard_state(),
ShardState::Unspecified
);

let subresponses = vec![
metastore::ListShardsSubresponse {
index_uid: "test-index-0:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
shards: vec![Shard {
shard_id: 42,
shard_state: ShardState::Open as i32,
..Default::default()
}],
next_shard_id: 43,
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Expand Up @@ -107,6 +107,10 @@ impl ShardTableEntry {
pub fn from_shards(shards: Vec<Shard>, next_shard_id: NextShardId) -> Self {
let shard_entries = shards
.into_iter()
.filter(|shard| {
let shard_state = shard.shard_state();
shard_state == ShardState::Open || shard_state == ShardState::Closed
})
.map(|shard| (shard.shard_id, shard.into()))
.collect();
Self {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/protos/quickwit/ingest.proto
Expand Up @@ -55,7 +55,7 @@ enum ShardState {
// The ingester hosting the shard is unavailable.
SHARD_STATE_UNAVAILABLE = 2;
// The shard is closed and cannot be written to.
// It can be safely deleted if the publish position is superior or equal to the replication position.
// It can be safely deleted if the publish position is superior or equal to `~eof`.
SHARD_STATE_CLOSED = 3;
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1e9ad73

Please sign in to comment.