diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 09ff7ef307..c160590e32 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -18,7 +18,10 @@ use shuttle_proto::runtime::{ runtime_client::RuntimeClient, LoadRequest, StartRequest, StopReason, SubscribeStopRequest, SubscribeStopResponse, }; -use tokio::sync::Mutex; +use tokio::{ + sync::Mutex, + task::{JoinHandle, JoinSet}, +}; use tonic::{transport::Channel, Code}; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -44,6 +47,8 @@ pub async fn task( ) { info!("Run task started"); + let mut set = JoinSet::new(); + while let Some(built) = recv.recv().await { let id = built.id; @@ -83,7 +88,7 @@ pub async fn task( }; let runtime_manager = runtime_manager.clone(); - tokio::spawn(async move { + set.spawn(async move { let parent_cx = global::get_text_map_propagator(|propagator| { propagator.extract(&built.tracing_context) }); @@ -91,7 +96,7 @@ pub async fn task( span.set_parent(parent_cx); async move { - if let Err(err) = built + match built .handle( storage_manager, secret_getter, @@ -103,8 +108,11 @@ pub async fn task( ) .await { - start_crashed_cleanup(&id, err) - } + Ok(handle) => handle + .await + .expect("the call to run in built.handle to be done"), + Err(err) => start_crashed_cleanup(&id, err), + }; info!("deployment done"); } @@ -112,6 +120,15 @@ pub async fn task( .await }); } + + while let Some(res) = set.join_next().await { + match res { + Ok(_) => (), + Err(err) => { + error!(error = %err, "an error happened when joining a deployment run task") + } + } + } } #[instrument(skip(active_deployment_getter, runtime_manager))] @@ -199,7 +216,7 @@ impl Built { deployment_updater: impl DeploymentUpdater, kill_old_deployments: impl futures::Future>, cleanup: impl FnOnce(Option) + Send + 'static, - ) -> Result<()> { + ) -> Result> { // For alpha this is the path to the users project with an embedded runtime. // For shuttle-next this is the path to the compiled .wasm file, which will be // used in the load request. @@ -244,7 +261,7 @@ impl Built { ) .await?; - tokio::spawn(run( + let handler = tokio::spawn(run( self.id, self.service_name, runtime_client, @@ -253,7 +270,7 @@ impl Built { cleanup, )); - Ok(()) + Ok(handler) } }