diff --git a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs index 464d99e16e..26e7078ba9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs @@ -63,10 +63,11 @@ impl CloseIdleShardsTask { let Some(state) = self.weak_state.upgrade() else { return; }; - let mut state_guard = - with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write") - .await - .expect("ingester should be ready"); + let Ok(mut state_guard) = + with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write").await + else { + return; + }; let now = Instant::now(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index a54c9fb079..8b61438e65 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -3174,6 +3174,8 @@ mod tests { #[tokio::test] async fn test_ingester_closes_idle_shards() { + // The `CloseIdleShardsTask` task is already unit tested, so this test ensures the task is + // correctly spawned upon starting an ingester. let idle_shard_timeout = Duration::from_millis(200); let (_ingester_ctx, ingester) = IngesterForTest::default() .with_idle_shard_timeout(idle_shard_timeout) @@ -3181,6 +3183,8 @@ mod tests { .await; let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); + let shard_01 = Shard { index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), @@ -3188,17 +3192,6 @@ mod tests { shard_state: ShardState::Open as i32, ..Default::default() }; - let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); - - let shard_02 = Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Closed as i32, - ..Default::default() - }; - let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2)); - let mut state_guard = ingester.state.lock_fully().await.unwrap(); let now = Instant::now(); @@ -3211,31 +3204,17 @@ mod tests { ) .await .unwrap(); - ingester - .init_primary_shard( - &mut state_guard.inner, - &mut state_guard.mrecordlog, - shard_02, - now, - ) - .await - .unwrap(); - drop(state_guard); - tokio::time::sleep(Duration::from_millis(100)).await; // 2 times the run interval period of the close idle shards task + drop(state_guard); + tokio::time::sleep(Duration::from_millis(500)).await; let state_guard = ingester.state.lock_partially().await.unwrap(); + state_guard .shards .get(&queue_id_01) .unwrap() .assert_is_closed(); - state_guard - .shards - .get(&queue_id_02) - .unwrap() - .assert_is_open(); - drop(state_guard); } #[tokio::test]