From fb3168b11b9036286269a7c71139ee515e60a035 Mon Sep 17 00:00:00 2001 From: NathanFlurry Date: Fri, 16 Aug 2024 05:35:13 +0000 Subject: [PATCH] chore(ds): split up destroy wf + add progress msg (#1072) ## Changes --- svc/api/servers/src/route/servers.rs | 2 +- svc/pkg/ds/src/ops/server/list_for_env.rs | 12 +- svc/pkg/ds/src/workflows/server/destroy.rs | 122 ++++++++++++--------- 3 files changed, 80 insertions(+), 56 deletions(-) diff --git a/svc/api/servers/src/route/servers.rs b/svc/api/servers/src/route/servers.rs index 1e5bbbb94e..a1743e86e3 100644 --- a/svc/api/servers/src/route/servers.rs +++ b/svc/api/servers/src/route/servers.rs @@ -181,7 +181,7 @@ pub async fn destroy( assert::server_for_env(&ctx, server_id, game_id, env_id).await?; let mut sub = ctx - .subscribe::(&json!({ + .subscribe::(&json!({ "server_id": server_id, })) .await?; diff --git a/svc/pkg/ds/src/ops/server/list_for_env.rs b/svc/pkg/ds/src/ops/server/list_for_env.rs index 5065dc9299..b0dc44b18a 100644 --- a/svc/pkg/ds/src/ops/server/list_for_env.rs +++ b/svc/pkg/ds/src/ops/server/list_for_env.rs @@ -29,20 +29,22 @@ pub async fn list_for_env(ctx: &OperationCtx, input: &Input) -> GlobalResult $2 AND - ($3 OR destroy_ts IS NOT NULL) AND - ( + env_id = $1 + AND tags @> $2 + AND ($3 OR destroy_ts IS NULL) + AND ( $4 IS NULL OR (create_ts, server_id) < (SELECT create_ts, server_id FROM after_server) ) ORDER BY create_ts DESC, server_id DESC - LIMIT 64 + LIMIT $5 ", input.env_id, serde_json::to_value(&input.tags)?, input.include_destroyed, input.cursor, + // TODO: Add pagination when OpenGB lobbies no longer uses polling RVTEE-492 + if input.include_destroyed { 64 } else { 10_000 }, ) .await? .into_iter() diff --git a/svc/pkg/ds/src/workflows/server/destroy.rs b/svc/pkg/ds/src/workflows/server/destroy.rs index 123bd0aa4d..2ea61802f7 100644 --- a/svc/pkg/ds/src/workflows/server/destroy.rs +++ b/svc/pkg/ds/src/workflows/server/destroy.rs @@ -1,10 +1,15 @@ use chirp_workflow::prelude::*; use futures_util::FutureExt; use serde_json::json; -use tracing::Instrument; use crate::util::NEW_NOMAD_CONFIG; +#[message("ds_server_destroy_started")] +pub struct DestroyStarted {} + +#[message("ds_server_destroy_complete")] +pub struct DestroyComplete {} + #[derive(Debug, Serialize, Deserialize)] pub(crate) struct Input { pub server_id: Uuid, @@ -19,12 +24,31 @@ pub(crate) async fn ds_server_destroy(ctx: &mut WorkflowCtx, input: &Input) -> G }) .await?; - ctx.activity(DeleteJobInput { - job_id: dynamic_server.dispatched_job_id.clone(), - alloc_id: dynamic_server.alloc_id.clone(), - }) + ctx.msg( + json!({ + "server_id": input.server_id, + }), + DestroyStarted {}, + ) .await?; + if let Some(job_id) = &dynamic_server.dispatched_job_id { + let delete_output = ctx + .activity(DeleteJobInput { + job_id: job_id.clone(), + }) + .await?; + + if delete_output.job_exists { + if let Some(alloc_id) = &dynamic_server.alloc_id { + ctx.activity(KillAllocInput { + alloc_id: alloc_id.clone(), + }) + .await?; + } + } + } + ctx.msg( json!({ "server_id": input.server_id, @@ -45,8 +69,8 @@ struct UpdateDbInput { struct UpdateDbOutput { server_id: Uuid, datacenter_id: Uuid, - dispatched_job_id: String, - alloc_id: String, + dispatched_job_id: Option, + alloc_id: Option, } #[activity(UpdateDb)] @@ -88,12 +112,16 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult GlobalResult<()> { - // TODO: Handle 404 safely. See RIV-179 +async fn delete_job(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult { + // TODO: Handle 404 safely. See RVTEE-498 // Stop the job. // // Setting purge to false will change the behavior of the create poll @@ -115,55 +143,49 @@ async fn delete_job(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<( { Ok(_) => { tracing::info!("job stopped"); - - kill_allocation(input.alloc_id.clone())?; + Ok(DeleteJobOutput { job_exists: true }) } Err(err) => { tracing::warn!(?err, "error thrown while stopping job"); + Ok(DeleteJobOutput { job_exists: false }) } } +} - Ok(()) +#[derive(Debug, Serialize, Deserialize, Hash)] +struct KillAllocInput { + alloc_id: String, } -#[message("ds_server_destroy_complete")] -pub struct DestroyComplete {} +#[activity(KillAlloc)] +async fn kill_alloc(ctx: &ActivityCtx, input: &KillAllocInput) -> GlobalResult<()> { + // Kills the allocation after 30 seconds + // + // See `docs/packages/job/JOB_DRAINING_AND_KILL_TIMEOUTS.md` -/// Kills the allocation after 30 seconds -/// -/// See `docs/packages/job/JOB_DRAINING_AND_KILL_TIMEOUTS.md` -fn kill_allocation(alloc_id: String) -> GlobalResult<()> { - tokio::task::Builder::new() - .name("ds::workflows::server::destroy::kill_allocation") - .spawn( - async move { - tokio::time::sleep(util_job::JOB_STOP_TIMEOUT).await; - - tracing::info!(?alloc_id, "manually killing allocation"); - - if let Err(err) = signal_allocation( - &NEW_NOMAD_CONFIG, - &alloc_id, - None, - Some(super::NOMAD_REGION), - None, - None, - Some(nomad_client_old::models::AllocSignalRequest { - task: None, - signal: Some("SIGKILL".to_string()), - }), - ) - .await - { - tracing::warn!( - ?err, - ?alloc_id, - "error while trying to manually kill allocation" - ); - } - } - .in_current_span(), - )?; + // TODO: Move this to a workflow sleep RVTEE-497 + tokio::time::sleep(util_job::JOB_STOP_TIMEOUT).await; + + // TODO: Handle 404 safely. See RVTEE-498 + if let Err(err) = signal_allocation( + &NEW_NOMAD_CONFIG, + &input.alloc_id, + None, + Some(super::NOMAD_REGION), + None, + None, + Some(nomad_client_old::models::AllocSignalRequest { + task: None, + signal: Some("SIGKILL".to_string()), + }), + ) + .await + { + tracing::warn!( + ?err, + "error while trying to manually kill allocation, ignoring" + ); + } Ok(()) }