Skip to content

Commit

Permalink
fix(deployment): properly await spawned tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Kazy committed Jun 7, 2023
1 parent 2901f0f commit f6fd6cc
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use shuttle_proto::runtime::{
runtime_client::RuntimeClient, LoadRequest, StartRequest, StopReason, SubscribeStopRequest,
SubscribeStopResponse,
};
use tokio::sync::Mutex;
use tokio::{
sync::Mutex,
task::{JoinHandle, JoinSet},
};
use tonic::{transport::Channel, Code};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand All @@ -44,6 +47,8 @@ pub async fn task(
) {
info!("Run task started");

let mut set = JoinSet::new();

while let Some(built) = recv.recv().await {
let id = built.id;

Expand Down Expand Up @@ -83,15 +88,15 @@ pub async fn task(
};
let runtime_manager = runtime_manager.clone();

tokio::spawn(async move {
set.spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&built.tracing_context)
});
let span = debug_span!("runner");
span.set_parent(parent_cx);

async move {
if let Err(err) = built
match built
.handle(
storage_manager,
secret_getter,
Expand All @@ -103,15 +108,27 @@ pub async fn task(
)
.await
{
start_crashed_cleanup(&id, err)
}
Ok(handle) => handle
.await
.expect("the call to run in built.handle to be done"),
Err(err) => start_crashed_cleanup(&id, err),
};

info!("deployment done");
}
.instrument(span)
.await
});
}

while let Some(res) = set.join_next().await {
match res {
Ok(_) => (),
Err(err) => {
error!(error = %err, "an error happened when joining a deployment run task")
}
}
}
}

#[instrument(skip(active_deployment_getter, runtime_manager))]
Expand Down Expand Up @@ -199,7 +216,7 @@ impl Built {
deployment_updater: impl DeploymentUpdater,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(Option<SubscribeStopResponse>) + Send + 'static,
) -> Result<()> {
) -> Result<JoinHandle<()>> {
// For alpha this is the path to the users project with an embedded runtime.
// For shuttle-next this is the path to the compiled .wasm file, which will be
// used in the load request.
Expand Down Expand Up @@ -244,7 +261,7 @@ impl Built {
)
.await?;

tokio::spawn(run(
let handler = tokio::spawn(run(
self.id,
self.service_name,
runtime_client,
Expand All @@ -253,7 +270,7 @@ impl Built {
cleanup,
));

Ok(())
Ok(handler)
}
}

Expand Down

0 comments on commit f6fd6cc

Please sign in to comment.