Skip to content

Commit

Permalink
fix(deployer): use tokio::select! to await tasks set in deploy/run queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Kazy committed Jun 26, 2023
1 parent cf3b89c commit 6d5db2b
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 130 deletions.
114 changes: 59 additions & 55 deletions deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,64 +43,68 @@ pub async fn task(

let mut tasks = JoinSet::new();

while let Some(queued) = recv.recv().await {
let id = queued.id;

info!("Queued deployment at the front of the queue: {id}");

let deployment_updater = deployment_updater.clone();
let run_send_cloned = run_send.clone();
let log_recorder = log_recorder.clone();
let secret_recorder = secret_recorder.clone();
let storage_manager = storage_manager.clone();
let queue_client = queue_client.clone();

tasks.spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&queued.tracing_context)
});
let span = debug_span!("builder");
span.set_parent(parent_cx);

async move {
match timeout(
Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available
wait_for_queue(queue_client.clone(), id),
)
.await
{
Ok(_) => {}
Err(err) => return build_failed(&id, err),
}

match queued
.handle(
storage_manager,
deployment_updater,
log_recorder,
secret_recorder,
)
.await
{
Ok(built) => {
remove_from_queue(queue_client, id).await;
promote_to_run(built, run_send_cloned).await
}
Err(err) => {
remove_from_queue(queue_client, id).await;
build_failed(&id, err)
loop {
tokio::select! {
Some(queued) = recv.recv() => {
let id = queued.id;

info!("Queued deployment at the front of the queue: {id}");

let deployment_updater = deployment_updater.clone();
let run_send_cloned = run_send.clone();
let log_recorder = log_recorder.clone();
let secret_recorder = secret_recorder.clone();
let storage_manager = storage_manager.clone();
let queue_client = queue_client.clone();

tasks.spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&queued.tracing_context)
});
let span = debug_span!("builder");
span.set_parent(parent_cx);

async move {
match timeout(
Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available
wait_for_queue(queue_client.clone(), id),
)
.await
{
Ok(_) => {}
Err(err) => return build_failed(&id, err),
}

match queued
.handle(
storage_manager,
deployment_updater,
log_recorder,
secret_recorder,
)
.await
{
Ok(built) => {
remove_from_queue(queue_client, id).await;
promote_to_run(built, run_send_cloned).await
}
Err(err) => {
remove_from_queue(queue_client, id).await;
build_failed(&id, err)
}
}
}
.instrument(span)
.await
});
},
Some(res) = tasks.join_next() => {
match res {
Ok(_) => (),
Err(err) => error!(error = %err, "an error happened while joining a builder task"),
}
}
.instrument(span)
.await
});
}

while let Some(res) = tasks.join_next().await {
match res {
Ok(_) => (),
Err(err) => error!(error = %err, "an error happened when joining a builder task"),
else => break
}
}
}
Expand Down
155 changes: 80 additions & 75 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,84 +49,89 @@ pub async fn task(

let mut set = JoinSet::new();

while let Some(built) = recv.recv().await {
let id = built.id;

info!("Built deployment at the front of run queue: {id}");

let deployment_updater = deployment_updater.clone();
let secret_getter = secret_getter.clone();
let resource_manager = resource_manager.clone();
let storage_manager = storage_manager.clone();

let old_deployments_killer = kill_old_deployments(
built.service_id,
id,
active_deployment_getter.clone(),
runtime_manager.clone(),
);
let cleanup = move |response: Option<SubscribeStopResponse>| {
debug!(response = ?response, "stop client response: ");

if let Some(response) = response {
match StopReason::from_i32(response.reason).unwrap_or_default() {
StopReason::Request => stopped_cleanup(&id),
StopReason::End => completed_cleanup(&id),
StopReason::Crash => crashed_cleanup(
&id,
Error::Run(anyhow::Error::msg(response.message).into()),
),
}
} else {
crashed_cleanup(
&id,
Error::Runtime(anyhow::anyhow!(
"stop subscribe channel stopped unexpectedly"
)),
)
}
};
let runtime_manager = runtime_manager.clone();

set.spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&built.tracing_context)
});
let span = debug_span!("runner");
span.set_parent(parent_cx);

async move {
match built
.handle(
storage_manager,
secret_getter,
resource_manager,
runtime_manager,
deployment_updater,
old_deployments_killer,
cleanup,
)
.await
{
Ok(handle) => handle
.await
.expect("the call to run in built.handle to be done"),
Err(err) => start_crashed_cleanup(&id, err),
loop {
tokio::select! {
Some(built) = recv.recv() => {
let id = built.id;

info!("Built deployment at the front of run queue: {id}");

let deployment_updater = deployment_updater.clone();
let secret_getter = secret_getter.clone();
let resource_manager = resource_manager.clone();
let storage_manager = storage_manager.clone();

let old_deployments_killer = kill_old_deployments(
built.service_id,
id,
active_deployment_getter.clone(),
runtime_manager.clone(),
);
let cleanup = move |response: Option<SubscribeStopResponse>| {
debug!(response = ?response, "stop client response: ");

if let Some(response) = response {
match StopReason::from_i32(response.reason).unwrap_or_default() {
StopReason::Request => stopped_cleanup(&id),
StopReason::End => completed_cleanup(&id),
StopReason::Crash => crashed_cleanup(
&id,
Error::Run(anyhow::Error::msg(response.message).into()),
),
}
} else {
crashed_cleanup(
&id,
Error::Runtime(anyhow::anyhow!(
"stop subscribe channel stopped unexpectedly"
)),
)
}
};
let runtime_manager = runtime_manager.clone();

set.spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&built.tracing_context)
});
let span = debug_span!("runner");
span.set_parent(parent_cx);

async move {
match built
.handle(
storage_manager,
secret_getter,
resource_manager,
runtime_manager,
deployment_updater,
old_deployments_killer,
cleanup,
)
.await
{
Ok(handle) => handle
.await
.expect("the call to run in built.handle to be done"),
Err(err) => start_crashed_cleanup(&id, err),
};

info!("deployment done");
}
.instrument(span)
.await
});
},
Some(res) = set.join_next() => {
match res {
Ok(_) => (),
Err(err) => {
error!(error = %err, "an error happened while joining a deployment run task")
}
}

info!("deployment done");
}
.instrument(span)
.await
});
}

while let Some(res) = set.join_next().await {
match res {
Ok(_) => (),
Err(err) => {
error!(error = %err, "an error happened when joining a deployment run task")
}
else => break
}
}
}
Expand Down

0 comments on commit 6d5db2b

Please sign in to comment.