Skip to content

Commit

Permalink
Attempt to fix flaky test test_ingester_closes_idle_shards
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jun 14, 2024
1 parent fb2dc2e commit 1c2ec26
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 32 deletions.
9 changes: 5 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ impl CloseIdleShardsTask {
let Some(state) = self.weak_state.upgrade() else {
return;
};
let mut state_guard =
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write")
.await
.expect("ingester should be ready");
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write").await
else {
return;
};

let now = Instant::now();

Expand Down
35 changes: 7 additions & 28 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3174,31 +3174,24 @@ mod tests {

#[tokio::test]
async fn test_ingester_closes_idle_shards() {
// The `CloseIdleShardsTask` task is already unit tested, so this test ensures the task is
// correctly spawned upon starting an ingester.
let idle_shard_timeout = Duration::from_millis(200);
let (_ingester_ctx, ingester) = IngesterForTest::default()
.with_idle_shard_timeout(idle_shard_timeout)
.build()
.await;

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));

let shard_01 = Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
..Default::default()
};
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));

let shard_02 = Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(2)),
shard_state: ShardState::Closed as i32,
..Default::default()
};
let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2));

let mut state_guard = ingester.state.lock_fully().await.unwrap();
let now = Instant::now();

Expand All @@ -3211,31 +3204,17 @@ mod tests {
)
.await
.unwrap();
ingester
.init_primary_shard(
&mut state_guard.inner,
&mut state_guard.mrecordlog,
shard_02,
now,
)
.await
.unwrap();
drop(state_guard);

tokio::time::sleep(Duration::from_millis(100)).await; // 2 times the run interval period of the close idle shards task
drop(state_guard);
tokio::time::sleep(Duration::from_millis(500)).await;

let state_guard = ingester.state.lock_partially().await.unwrap();

state_guard
.shards
.get(&queue_id_01)
.unwrap()
.assert_is_closed();
state_guard
.shards
.get(&queue_id_02)
.unwrap()
.assert_is_open();
drop(state_guard);
}

#[tokio::test]
Expand Down

0 comments on commit 1c2ec26

Please sign in to comment.