From a297a6697738ae99361e72caadf9fabcee9b1813 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Wed, 15 May 2024 08:48:16 -0400 Subject: [PATCH] Stop marking ingesters as `Unavailable` in control plane (#4973) --- .../src/control_plane.rs | 81 ++++++------ .../src/ingest/ingest_controller.rs | 98 -------------- .../quickwit-control-plane/src/model/mod.rs | 5 - .../src/model/shard_table.rs | 19 --- .../src/actors/merge_pipeline.rs | 3 - .../src/models/indexing_service_message.rs | 13 -- .../src/ingest_v2/broadcast.rs | 5 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 121 +++++++++++++++++- .../quickwit-ingest/src/ingest_v2/router.rs | 58 +++++++++ .../src/ingest_v2/routing_table.rs | 25 ++++ quickwit/quickwit-proto/src/types/mod.rs | 10 ++ .../src/developer_api/server.rs | 8 +- quickwit/quickwit-serve/src/grpc.rs | 10 +- quickwit/quickwit-serve/src/lib.rs | 15 ++- quickwit/quickwit-serve/src/rest.rs | 3 +- 15 files changed, 277 insertions(+), 197 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 61232a76fe..6e577159a7 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use std::fmt; use std::fmt::Formatter; use std::time::Duration; @@ -344,33 +344,30 @@ impl ControlPlane { }) .unwrap_or_default(); - let shard_table: Vec = self - .model - .all_shards_with_source() - .map(|(source, shards)| { - let shards: Vec = shards - .map(|shard| { - json!({ - "shard_id": shard.shard_id.clone(), - "shard_state": shard.shard_state().as_json_str_name(), - "leader_id": shard.leader_id.clone(), - "follower_id": shard.follower_id.clone(), - "publish_position_inclusive": shard.publish_position_inclusive(), - }) - }) - .collect(); + let mut per_index_shards_json: HashMap> = HashMap::new(); + for (source_uid, shard_entries) in self.model.all_shards_with_source() { + let index_uid = source_uid.index_uid.clone(); + let source_id = source_uid.source_id.clone(); + let shards_json = shard_entries.map(|shard_entry| { json!({ - "index_uid": source.index_uid.clone(), - "source_id": source.source_id.clone(), - "shards": shards, + "index_uid": index_uid, + "source_id": source_id, + "shard_id": shard_entry.shard_id.clone(), + "shard_state": shard_entry.shard_state().as_json_str_name(), + "leader_id": shard_entry.leader_id.clone(), + "follower_id": shard_entry.follower_id.clone(), + "publish_position_inclusive": shard_entry.publish_position_inclusive(), }) - }) - .collect(); - + }); + per_index_shards_json + .entry(index_uid.clone()) + .or_default() + .extend(shards_json); + } json!({ "physical_indexing_plan": physical_indexing_plan, - "shard_table": shard_table, + "shard_table": per_index_shards_json, }) } @@ -1659,7 +1656,8 @@ mod tests { assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]); let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap(); - let shard = &control_plane_debug_info["shard_table"][0]["shards"][0]; + let shard = + &control_plane_debug_info["shard_table"]["test-index-0:00000000000000000000000000"][0]; assert_eq!(shard["shard_id"], "00000000000000000017"); assert_eq!(shard["publish_position_inclusive"], "00000000000000001000"); @@ -1714,23 +1712,23 @@ mod tests { let ingester_pool = IngesterPool::default(); let mut mock_metastore = MockMetastoreService::new(); - let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0"); - let mut source = SourceConfig::ingest_v2(); - source.enabled = true; - index_0.add_source(source.clone()).unwrap(); + let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index"); + let mut source_config = SourceConfig::ingest_v2(); + source_config.enabled = true; + index_metadata.add_source(source_config.clone()).unwrap(); - let index_0_clone = index_0.clone(); + let index_metadata_clone = index_metadata.clone(); mock_metastore.expect_list_indexes_metadata().return_once( move |list_indexes_request: ListIndexesMetadataRequest| { assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all()); Ok(ListIndexesMetadataResponse::for_test(vec![ - index_0_clone.clone() + index_metadata_clone, ])) }, ); let mut shard = Shard { - index_uid: Some(index_0.index_uid.clone()), + index_uid: Some(index_metadata.index_uid.clone()), source_id: INGEST_V2_SOURCE_ID.to_string(), shard_id: Some(ShardId::from(17)), leader_id: "test_node".to_string(), @@ -1739,7 +1737,7 @@ mod tests { }; shard.set_shard_state(ShardState::Open); - let index_uid_clone = index_0.index_uid.clone(); + let index_uid_clone = index_metadata.index_uid.clone(); mock_metastore.expect_list_shards().return_once( move |_list_shards_request: ListShardsRequest| { let list_shards_resp = ListShardsResponse { @@ -1765,7 +1763,8 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), ); let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap(); - let shard = &control_plane_debug_info["shard_table"][0]["shards"][0]; + let shard = + &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0]; assert_eq!(shard["shard_id"], "00000000000000000017"); assert_eq!(shard["publish_position_inclusive"], "00000000000000001234"); @@ -2393,7 +2392,9 @@ mod tests { control_plane_mailbox.ask(callback).await.unwrap(); let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap(); - let shard = &control_plane_debug_info["shard_table"][0]["shards"][0]; + println!("{:?}", control_plane_debug_info); + let shard = + &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0]; assert_eq!(shard["shard_id"], "00000000000000000000"); assert_eq!(shard["shard_state"], "closed"); @@ -2529,14 +2530,10 @@ mod tests { control_plane_debug_info["physical_indexing_plan"][0]["node_id"], "test-ingester" ); - let shard_table_entry = &control_plane_debug_info["shard_table"][0]; - assert_eq!( - shard_table_entry["index_uid"], - "test-index:00000000000000000000000000" - ); - assert_eq!(shard_table_entry["source_id"], INGEST_V2_SOURCE_ID); - - let shard = &shard_table_entry["shards"][0]; + let shard = + &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0]; + assert_eq!(shard["index_uid"], "test-index:00000000000000000000000000"); + assert_eq!(shard["source_id"], INGEST_V2_SOURCE_ID); assert_eq!(shard["shard_id"], "00000000000000000000"); assert_eq!(shard["shard_state"], "open"); assert_eq!(shard["leader_id"], "test-ingester"); diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 21407cbf47..40bd2f5eef 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -245,26 +245,6 @@ impl IngestController { } } - fn handle_unavailable_leaders( - &self, - unavailable_leaders: &FnvHashSet, - model: &mut ControlPlaneModel, - ) { - let mut confirmed_unavailable_leaders = FnvHashSet::default(); - - for leader_id in unavailable_leaders { - if !self.ingester_pool.contains_key(leader_id) { - confirmed_unavailable_leaders.insert(leader_id.clone()); - } else { - // TODO: If a majority of ingesters consistenly reports a leader as unavailable, we - // should probably mark it as unavailable too. - } - } - if !confirmed_unavailable_leaders.is_empty() { - model.set_shards_as_unavailable(&confirmed_unavailable_leaders); - } - } - /// Finds the open shards that satisfies the [`GetOrCreateOpenShardsRequest`] request sent by an /// ingest router. First, the control plane checks its internal shard table to find /// candidates. If it does not contain any, the control plane will ask @@ -283,8 +263,6 @@ impl IngestController { .map(|ingester_id| ingester_id.into()) .collect(); - self.handle_unavailable_leaders(&unavailable_leaders, model); - let num_subrequests = get_open_shards_request.subrequests.len(); let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests); let mut get_or_create_open_shards_failures = Vec::new(); @@ -1297,82 +1275,6 @@ mod tests { assert!(shard_1.is_closed()); } - #[tokio::test] - async fn test_ingest_controller_get_open_shards_handles_unavailable_leaders() { - let metastore = MetastoreServiceClient::mocked(); - - let ingester_pool = IngesterPool::default(); - let ingester_1 = IngesterServiceClient::mocked(); - ingester_pool.insert("test-ingester-1".into(), ingester_1); - - let replication_factor = 2; - - let mut ingest_controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); - let mut model = ControlPlaneModel::default(); - - let index_uid = IndexUid::for_test("test-index-0", 0); - let source_id: SourceId = "test-source".into(); - - let shards = vec![ - Shard { - index_uid: Some(index_uid.clone()), - source_id: source_id.clone(), - shard_id: Some(ShardId::from(1)), - leader_id: "test-ingester-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: source_id.clone(), - shard_id: Some(ShardId::from(2)), - leader_id: "test-ingester-0".to_string(), - shard_state: ShardState::Closed as i32, - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: source_id.clone(), - shard_id: Some(ShardId::from(3)), - leader_id: "test-ingester-1".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }, - ]; - model.insert_shards(&index_uid, &source_id, shards); - - let request = GetOrCreateOpenShardsRequest { - subrequests: Vec::new(), - closed_shards: Vec::new(), - unavailable_leaders: vec!["test-ingester-0".to_string()], - }; - let progress = Progress::default(); - - ingest_controller - .get_or_create_open_shards(request, &mut model, &progress) - .await - .unwrap(); - - let shard_1 = model - .all_shards() - .find(|shard| shard.shard_id() == ShardId::from(1)) - .unwrap(); - assert!(shard_1.is_unavailable()); - - let shard_2 = model - .all_shards() - .find(|shard| shard.shard_id() == ShardId::from(2)) - .unwrap(); - assert!(shard_2.is_closed()); - - let shard_3 = model - .all_shards() - .find(|shard| shard.shard_id() == ShardId::from(3)) - .unwrap(); - assert!(shard_3.is_open()); - } - #[test] fn test_ingest_controller_allocate_shards() { let metastore = MetastoreServiceClient::mocked(); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 4bf3d3ddd8..229f927555 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -259,11 +259,6 @@ impl ControlPlaneModel { Ok(has_changed) } - pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet) { - self.shard_table - .set_shards_as_unavailable(unavailable_leaders); - } - pub(crate) fn all_shards(&self) -> impl Iterator + '_ { self.shard_table.all_shards() } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 4f61f68cbc..65395f63ca 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -323,25 +323,6 @@ impl ShardTable { .map(|(source, shard_table)| (source, shard_table.shard_entries.values())) } - pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet) { - for (source_uid, shard_table_entry) in &mut self.table_entries { - let mut modified = false; - for shard_entry in shard_table_entry.shard_entries.values_mut() { - if shard_entry.is_open() && unavailable_leaders.contains(&shard_entry.leader_id) { - shard_entry.set_shard_state(ShardState::Unavailable); - modified = true; - } - } - if modified { - let num_open_shards = shard_table_entry.num_open_shards(); - crate::metrics::CONTROL_PLANE_METRICS - .open_shards_total - .with_label_values([source_uid.index_uid.index_id.as_str()]) - .set(num_open_shards as i64); - }; - } - } - /// Lists the shards of a given source. Returns `None` if the source does not exist. pub fn get_shards(&self, source_uid: &SourceUid) -> Option<&FnvHashMap> { self.table_entries diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 3f29c5d6c6..75808e849c 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -55,9 +55,6 @@ use crate::split_store::IndexingSplitStore; /// concurrently. static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10); -#[derive(Debug)] -struct ObserveLoop; - struct MergePipelineHandles { merge_planner: ActorHandle, merge_split_downloader: ActorHandle, diff --git a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs index cd005d235e..6c76898574 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs @@ -30,19 +30,6 @@ pub struct SpawnPipeline { pub pipeline_uid: PipelineUid, } -#[derive(Clone, Debug)] -pub struct ShutdownPipelines { - pub index_id: String, - pub source_id: Option, - // TODO - // pub pipeline_ord: Option, -} - -#[derive(Clone, Debug)] -pub struct ShutdownPipeline { - pub pipeline_id: IndexingPipelineId, -} - /// Detaches a pipeline from the indexing service. The pipeline is no longer managed by the /// server. This is mostly useful for ad-hoc indexing pipelines launched with `quickwit index /// ingest ..` and testing. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 19264cf3de..8feaaa7aa3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -185,11 +185,12 @@ impl BroadcastLocalShardsTask { for (queue_id, shard_state) in queue_ids { let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(&queue_id) else { - warn!("rate limiter `{queue_id}` not found",); + warn!( + "rate limiter `{queue_id}` not found: this should never happen, please report" + ); continue; }; let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { - warn!("failed to parse queue ID `{queue_id}`"); continue; }; let source_uid = SourceUid { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 2d26add62c..bf4dbb7a14 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -79,6 +79,7 @@ use super::replication::{ use super::state::{IngesterState, InnerIngesterState, WeakIngesterState}; use super::IngesterPool; use crate::ingest_v2::metrics::report_wal_usage; +use crate::ingest_v2::models::IngesterShardType; use crate::metrics::INGEST_METRICS; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{estimate_size, with_lock_metrics, FollowerId}; @@ -287,7 +288,6 @@ impl Ingester { for queue_id in state_guard.mrecordlog.list_queues() { let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else { - warn!("failed to parse queue ID `{queue_id}`"); continue; }; per_source_shard_ids @@ -1012,9 +1012,44 @@ impl Ingester { }) } }; + let mut per_index_shards_json: HashMap> = HashMap::new(); + + for (queue_id, shard) in &state_guard.shards { + let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else { + continue; + }; + let mut shard_json = json!({ + "index_uid": index_uid, + "source_id": source_id, + "shard_id": shard_id, + "state": shard.shard_state.as_json_str_name(), + "replication_position_inclusive": shard.replication_position_inclusive, + "truncation_position_inclusive": shard.truncation_position_inclusive, + }); + match &shard.shard_type { + IngesterShardType::Primary { follower_id } => { + shard_json["type"] = json!("primary"); + shard_json["leader_id"] = json!(self.self_node_id.to_string()); + shard_json["follower_id"] = json!(follower_id.to_string()); + } + IngesterShardType::Replica { leader_id } => { + shard_json["type"] = json!("replica"); + shard_json["leader_id"] = json!(leader_id.to_string()); + shard_json["follower_id"] = json!(self.self_node_id.to_string()); + } + IngesterShardType::Solo => { + shard_json["type"] = json!("solo"); + shard_json["leader_id"] = json!(self.self_node_id.to_string()); + } + }; + per_index_shards_json + .entry(index_uid.clone()) + .or_default() + .push(shard_json); + } json!({ "status": state_guard.status().as_json_str_name(), - "shards": state_guard.shards.keys().collect::>(), // TODO: add more info + "shards": per_index_shards_json, "mrecordlog": state_guard.mrecordlog.summary(), }) } @@ -3188,4 +3223,86 @@ mod tests { .assert_is_open(); drop(state_guard); } + + #[tokio::test] + async fn test_ingester_debug_info() { + let (_ingester_ctx, ingester) = IngesterForTest::default().build().await; + + let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); + let shard_01 = Shard { + index_uid: Some(index_uid_0.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_02 = Shard { + index_uid: Some(index_uid_0.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); + let shard_03 = Shard { + index_uid: Some(index_uid_1.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(3)), + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + + let mut state_guard = ingester.state.lock_fully().await.unwrap(); + let now = Instant::now(); + + ingester + .init_primary_shard( + &mut state_guard.inner, + &mut state_guard.mrecordlog, + shard_01, + now, + ) + .await + .unwrap(); + ingester + .init_primary_shard( + &mut state_guard.inner, + &mut state_guard.mrecordlog, + shard_02, + now, + ) + .await + .unwrap(); + ingester + .init_primary_shard( + &mut state_guard.inner, + &mut state_guard.mrecordlog, + shard_03, + now, + ) + .await + .unwrap(); + drop(state_guard); + + let debug_info = ingester.debug_info().await; + assert_eq!(debug_info["status"], "ready"); + + let shards = &debug_info["shards"]; + assert_eq!(shards.as_object().unwrap().len(), 2); + + assert_eq!( + shards["test-index-0:00000000000000000000000000"] + .as_array() + .unwrap() + .len(), + 2 + ); + assert_eq!( + shards["test-index-1:00000000000000000000000000"] + .as_array() + .unwrap() + .len(), + 1 + ); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index ba78f4585d..fde9419c1e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -39,6 +39,7 @@ use quickwit_proto::ingest::ingester::{ use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService}; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId}; +use serde_json::{json, Value as JsonValue}; use tokio::sync::{Mutex, Semaphore}; use tracing::info; @@ -451,6 +452,15 @@ impl IngestRouter { IngestV2Error::Timeout(message) })? } + + pub async fn debug_info(&self) -> JsonValue { + let state_guard = self.state.lock().await; + let routing_table_json = state_guard.routing_table.debug_info(); + + json!({ + "routing_table": routing_table_json, + }) + } } #[async_trait] @@ -1682,4 +1692,52 @@ mod tests { assert_eq!(shards[0].shard_id, ShardId::from(2)); drop(state_guard); } + + #[tokio::test] + async fn test_router_debug_info() { + let self_node_id = "test-router".into(); + let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + let router = IngestRouter::new( + self_node_id, + control_plane, + ingester_pool.clone(), + replication_factor, + ); + let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); + let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); + + let mut state_guard = router.state.lock().await; + state_guard.routing_table.replace_shards( + index_uid_0.clone(), + "test-source", + vec![Shard { + index_uid: Some(index_uid_0.clone()), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester".to_string(), + ..Default::default() + }], + ); + state_guard.routing_table.replace_shards( + index_uid_1.clone(), + "test-source", + vec![Shard { + index_uid: Some(index_uid_1.clone()), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester".to_string(), + ..Default::default() + }], + ); + drop(state_guard); + + let debug_info = router.debug_info().await; + let routing_table = &debug_info["routing_table"]; + assert_eq!(routing_table.as_object().unwrap().len(), 2); + + assert_eq!(routing_table["test-index-0"].as_array().unwrap().len(), 1); + assert_eq!(routing_table["test-index-1"].as_array().unwrap().len(), 1); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 2ac9d12c75..b2095b6647 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use quickwit_proto::ingest::{Shard, ShardIds, ShardState}; use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId}; +use serde_json::{json, Value as JsonValue}; use tracing::{info, warn}; use crate::IngesterPool; @@ -452,6 +453,30 @@ impl RoutingTable { } } + pub fn debug_info(&self) -> HashMap> { + let mut per_index_shards_json: HashMap> = HashMap::new(); + + for ((index_id, source_id), entry) in &self.table { + for (shards, is_local) in &[(&entry.local_shards, true), (&entry.remote_shards, false)] + { + let shards_json = shards.iter().map(|shard| { + json!({ + "index_uid": shard.index_uid, + "source_id": source_id, + "shard_id": shard.shard_id, + "shard_state": shard.shard_state.as_json_str_name(), + "is_local": is_local, + }) + }); + per_index_shards_json + .entry(index_id.clone()) + .or_default() + .extend(shards_json); + } + } + per_index_shards_json + } + #[cfg(test)] pub fn len(&self) -> usize { self.table.len() diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index 18e034e628..f0d79d9493 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -25,6 +25,7 @@ use std::ops::Deref; use std::str::FromStr; use serde::{Deserialize, Serialize}; +use tracing::warn; pub use ulid::Ulid; mod index_uid; @@ -56,6 +57,15 @@ pub fn queue_id(index_uid: &IndexUid, source_id: &str, shard_id: &ShardId) -> Qu } pub fn split_queue_id(queue_id: &str) -> Option<(IndexUid, SourceId, ShardId)> { + let parts_opt = split_queue_id_inner(queue_id); + + if parts_opt.is_none() { + warn!("failed to parse queue ID `{queue_id}`: this should never happen, please report"); + } + parts_opt +} + +fn split_queue_id_inner(queue_id: &str) -> Option<(IndexUid, SourceId, ShardId)> { let mut parts = queue_id.split('/'); let index_uid = parts.next()?; let source_id = parts.next()?; diff --git a/quickwit/quickwit-serve/src/developer_api/server.rs b/quickwit/quickwit-serve/src/developer_api/server.rs index c6bd0de9d8..03791b1b4f 100644 --- a/quickwit/quickwit-serve/src/developer_api/server.rs +++ b/quickwit/quickwit-serve/src/developer_api/server.rs @@ -29,7 +29,7 @@ use quickwit_cluster::Cluster; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; use quickwit_control_plane::control_plane::{ControlPlane, GetDebugInfo}; -use quickwit_ingest::Ingester; +use quickwit_ingest::{IngestRouter, Ingester}; use quickwit_proto::developer::{ DeveloperError, DeveloperResult, DeveloperService, GetDebugInfoRequest, GetDebugInfoResponse, }; @@ -42,6 +42,7 @@ pub(crate) struct DeveloperApiServer { node_config: Arc, cluster: Cluster, control_plane_mailbox_opt: Option>, + ingest_router_opt: Option, ingester_opt: Option, } @@ -59,6 +60,7 @@ impl DeveloperApiServer { node_config: services.node_config.clone(), cluster: services.cluster.clone(), control_plane_mailbox_opt: services.control_plane_server_opt.clone(), + ingest_router_opt: services.ingest_router_opt.clone(), ingester_opt: services.ingester_opt.clone(), } } @@ -100,6 +102,9 @@ impl DeveloperService for DeveloperApiServer { }; } } + if let Some(ingest_router) = &self.ingest_router_opt { + debug_info["ingest_router"] = ingest_router.debug_info().await; + } if let Some(ingester) = &self.ingester_opt { if roles.is_empty() || roles.contains(&QuickwitService::Indexer) { debug_info["ingester"] = ingester.debug_info().await; @@ -143,6 +148,7 @@ mod tests { node_config, cluster, control_plane_mailbox_opt: None, + ingest_router_opt: None, ingester_opt: None, }; let request = GetDebugInfoRequest { roles: Vec::new() }; diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 12634a0b00..7a1c24691e 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -91,11 +91,11 @@ pub(crate) async fn start_grpc_server( .is_service_enabled(QuickwitService::Indexer) { enabled_grpc_services.insert("ingest-router"); - Some( - services - .ingest_router_service - .as_grpc_service(max_message_size), - ) + + let ingest_router_service = services + .ingest_router_service + .as_grpc_service(max_message_size); + Some(ingest_router_service) } else { None }; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 5e92984328..f5235db275 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -188,6 +188,7 @@ struct QuickwitServices { // Ingest v1 pub ingest_service: IngestServiceClient, // Ingest v2 + pub ingest_router_opt: Option, pub ingest_router_service: IngestRouterServiceClient, ingester_opt: Option, @@ -538,7 +539,7 @@ pub async fn serve_quickwit( ); // Setup ingest service v2. - let (ingest_router_service, ingester_opt) = setup_ingest_v2( + let (ingest_router, ingest_router_service, ingester_opt) = setup_ingest_v2( &node_config, &cluster, &event_broker, @@ -700,6 +701,7 @@ pub async fn serve_quickwit( _report_splits_subscription_handle_opt: report_splits_subscription_handle_opt, index_manager, indexing_service_opt, + ingest_router_opt: Some(ingest_router), ingest_router_service, ingest_service, ingester_opt: ingester_opt.clone(), @@ -822,7 +824,7 @@ async fn setup_ingest_v2( event_broker: &EventBroker, control_plane: ControlPlaneServiceClient, ingester_pool: IngesterPool, -) -> anyhow::Result<(IngestRouterServiceClient, Option)> { +) -> anyhow::Result<(IngestRouter, IngestRouterServiceClient, Option)> { // Instantiate ingest router. let self_node_id: NodeId = cluster.self_node_id().into(); let content_length_limit = node_config.ingest_api_config.content_length_limit; @@ -831,6 +833,9 @@ async fn setup_ingest_v2( .replication_factor() .expect("replication factor should have been validated") .get(); + + // Any node can serve ingest requests, so we always instantiate an ingest router. + // TODO: I'm not sure that's such a good idea. let ingest_router = IngestRouter::new( self_node_id.clone(), control_plane.clone(), @@ -839,11 +844,9 @@ async fn setup_ingest_v2( ); ingest_router.subscribe(event_broker); - // Any node can serve ingest requests, so we always instantiate an ingest router. - // TODO: I'm not sure that's such a good idea. let ingest_router_service = IngestRouterServiceClient::tower() .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) - .build(ingest_router); + .build(ingest_router.clone()); // We compute the burst limit as something a bit larger than the content length limit, because // we actually rewrite the `\n-delimited format into a tiny bit larger buffer, where the @@ -937,7 +940,7 @@ async fn setup_ingest_v2( }) }); ingester_pool.listen_for_changes(ingester_change_stream); - Ok((ingest_router_service, ingester_opt)) + Ok((ingest_router, ingest_router_service, ingester_opt)) } async fn setup_searcher( diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index a4f122383c..6b61cba495 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -628,8 +628,9 @@ mod tests { indexing_service_opt: None, index_manager: index_service, ingest_service: ingest_service_client(), - ingester_opt: None, + ingest_router_opt: None, ingest_router_service: IngestRouterServiceClient::mocked(), + ingester_opt: None, janitor_service_opt: None, otlp_logs_service_opt: None, otlp_traces_service_opt: None,