diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index f24d599b2..e9acd8a16 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -43,64 +43,68 @@ pub async fn task( let mut tasks = JoinSet::new(); - while let Some(queued) = recv.recv().await { - let id = queued.id; - - info!("Queued deployment at the front of the queue: {id}"); - - let deployment_updater = deployment_updater.clone(); - let run_send_cloned = run_send.clone(); - let log_recorder = log_recorder.clone(); - let secret_recorder = secret_recorder.clone(); - let storage_manager = storage_manager.clone(); - let queue_client = queue_client.clone(); - - tasks.spawn(async move { - let parent_cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&queued.tracing_context) - }); - let span = debug_span!("builder"); - span.set_parent(parent_cx); - - async move { - match timeout( - Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available - wait_for_queue(queue_client.clone(), id), - ) - .await - { - Ok(_) => {} - Err(err) => return build_failed(&id, err), - } - - match queued - .handle( - storage_manager, - deployment_updater, - log_recorder, - secret_recorder, - ) - .await - { - Ok(built) => { - remove_from_queue(queue_client, id).await; - promote_to_run(built, run_send_cloned).await - } - Err(err) => { - remove_from_queue(queue_client, id).await; - build_failed(&id, err) + loop { + tokio::select! { + Some(queued) = recv.recv() => { + let id = queued.id; + + info!("Queued deployment at the front of the queue: {id}"); + + let deployment_updater = deployment_updater.clone(); + let run_send_cloned = run_send.clone(); + let log_recorder = log_recorder.clone(); + let secret_recorder = secret_recorder.clone(); + let storage_manager = storage_manager.clone(); + let queue_client = queue_client.clone(); + + tasks.spawn(async move { + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&queued.tracing_context) + }); + let span = debug_span!("builder"); + span.set_parent(parent_cx); + + async move { + match timeout( + Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available + wait_for_queue(queue_client.clone(), id), + ) + .await + { + Ok(_) => {} + Err(err) => return build_failed(&id, err), + } + + match queued + .handle( + storage_manager, + deployment_updater, + log_recorder, + secret_recorder, + ) + .await + { + Ok(built) => { + remove_from_queue(queue_client, id).await; + promote_to_run(built, run_send_cloned).await + } + Err(err) => { + remove_from_queue(queue_client, id).await; + build_failed(&id, err) + } + } } + .instrument(span) + .await + }); + }, + Some(res) = tasks.join_next() => { + match res { + Ok(_) => (), + Err(err) => error!(error = %err, "an error happened while joining a builder task"), } } - .instrument(span) - .await - }); - } - - while let Some(res) = tasks.join_next().await { - match res { - Ok(_) => (), - Err(err) => error!(error = %err, "an error happened when joining a builder task"), + else => break } } } diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index c160590e3..eba5e6f04 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -49,84 +49,89 @@ pub async fn task( let mut set = JoinSet::new(); - while let Some(built) = recv.recv().await { - let id = built.id; - - info!("Built deployment at the front of run queue: {id}"); - - let deployment_updater = deployment_updater.clone(); - let secret_getter = secret_getter.clone(); - let resource_manager = resource_manager.clone(); - let storage_manager = storage_manager.clone(); - - let old_deployments_killer = kill_old_deployments( - built.service_id, - id, - active_deployment_getter.clone(), - runtime_manager.clone(), - ); - let cleanup = move |response: Option| { - debug!(response = ?response, "stop client response: "); - - if let Some(response) = response { - match StopReason::from_i32(response.reason).unwrap_or_default() { - StopReason::Request => stopped_cleanup(&id), - StopReason::End => completed_cleanup(&id), - StopReason::Crash => crashed_cleanup( - &id, - Error::Run(anyhow::Error::msg(response.message).into()), - ), - } - } else { - crashed_cleanup( - &id, - Error::Runtime(anyhow::anyhow!( - "stop subscribe channel stopped unexpectedly" - )), - ) - } - }; - let runtime_manager = runtime_manager.clone(); - - set.spawn(async move { - let parent_cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&built.tracing_context) - }); - let span = debug_span!("runner"); - span.set_parent(parent_cx); - - async move { - match built - .handle( - storage_manager, - secret_getter, - resource_manager, - runtime_manager, - deployment_updater, - old_deployments_killer, - cleanup, - ) - .await - { - Ok(handle) => handle - .await - .expect("the call to run in built.handle to be done"), - Err(err) => start_crashed_cleanup(&id, err), + loop { + tokio::select! { + Some(built) = recv.recv() => { + let id = built.id; + + info!("Built deployment at the front of run queue: {id}"); + + let deployment_updater = deployment_updater.clone(); + let secret_getter = secret_getter.clone(); + let resource_manager = resource_manager.clone(); + let storage_manager = storage_manager.clone(); + + let old_deployments_killer = kill_old_deployments( + built.service_id, + id, + active_deployment_getter.clone(), + runtime_manager.clone(), + ); + let cleanup = move |response: Option| { + debug!(response = ?response, "stop client response: "); + + if let Some(response) = response { + match StopReason::from_i32(response.reason).unwrap_or_default() { + StopReason::Request => stopped_cleanup(&id), + StopReason::End => completed_cleanup(&id), + StopReason::Crash => crashed_cleanup( + &id, + Error::Run(anyhow::Error::msg(response.message).into()), + ), + } + } else { + crashed_cleanup( + &id, + Error::Runtime(anyhow::anyhow!( + "stop subscribe channel stopped unexpectedly" + )), + ) + } }; + let runtime_manager = runtime_manager.clone(); + + set.spawn(async move { + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&built.tracing_context) + }); + let span = debug_span!("runner"); + span.set_parent(parent_cx); + + async move { + match built + .handle( + storage_manager, + secret_getter, + resource_manager, + runtime_manager, + deployment_updater, + old_deployments_killer, + cleanup, + ) + .await + { + Ok(handle) => handle + .await + .expect("the call to run in built.handle to be done"), + Err(err) => start_crashed_cleanup(&id, err), + }; + + info!("deployment done"); + } + .instrument(span) + .await + }); + }, + Some(res) = set.join_next() => { + match res { + Ok(_) => (), + Err(err) => { + error!(error = %err, "an error happened while joining a deployment run task") + } + } - info!("deployment done"); - } - .instrument(span) - .await - }); - } - - while let Some(res) = set.join_next().await { - match res { - Ok(_) => (), - Err(err) => { - error!(error = %err, "an error happened when joining a deployment run task") } + else => break } } }