diff --git a/svc/Cargo.lock b/svc/Cargo.lock index cafa96e748..32cd552a84 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -3112,6 +3112,7 @@ dependencies = [ "mm-config-version-get", "mm-lobby-get", "mm-lobby-list-for-user-id", + "nomad-client", "nomad-util", "nomad_client", "rand", diff --git a/svc/pkg/ds/Cargo.toml b/svc/pkg/ds/Cargo.toml index ad669172e1..aab5741460 100644 --- a/svc/pkg/ds/Cargo.toml +++ b/svc/pkg/ds/Cargo.toml @@ -17,6 +17,7 @@ hex = "0.4" http = "0.2" lazy_static = "1.4.0" nomad-util = { path = "../../../lib/nomad-util" } +nomad-client-old = { package = "nomad-client", version = "0.0.9" } rand = "0.8" regex = "1.10" reqwest = { version = "0.12", features = ["json"] } diff --git a/svc/pkg/ds/src/workflows/server/destroy.rs b/svc/pkg/ds/src/workflows/server/destroy.rs index c320af6986..123bd0aa4d 100644 --- a/svc/pkg/ds/src/workflows/server/destroy.rs +++ b/svc/pkg/ds/src/workflows/server/destroy.rs @@ -1,6 +1,7 @@ use chirp_workflow::prelude::*; use futures_util::FutureExt; use serde_json::json; +use tracing::Instrument; use crate::util::NEW_NOMAD_CONFIG; @@ -20,6 +21,7 @@ pub(crate) async fn ds_server_destroy(ctx: &mut WorkflowCtx, input: &Input) -> G ctx.activity(DeleteJobInput { job_id: dynamic_server.dispatched_job_id.clone(), + alloc_id: dynamic_server.alloc_id.clone(), }) .await?; @@ -86,6 +88,7 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult GlobalResult<( Ok(_) => { tracing::info!("job stopped"); - // tokio::task::spawn(async move { - - // }); + kill_allocation(input.alloc_id.clone())?; } Err(err) => { tracing::warn!(?err, "error thrown while stopping job"); @@ -127,3 +128,108 @@ async fn delete_job(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<( #[message("ds_server_destroy_complete")] pub struct DestroyComplete {} + +/// Kills the allocation after 30 seconds +/// +/// See `docs/packages/job/JOB_DRAINING_AND_KILL_TIMEOUTS.md` +fn kill_allocation(alloc_id: String) -> GlobalResult<()> { + tokio::task::Builder::new() + .name("ds::workflows::server::destroy::kill_allocation") + .spawn( + async move { + tokio::time::sleep(util_job::JOB_STOP_TIMEOUT).await; + + tracing::info!(?alloc_id, "manually killing allocation"); + + if let Err(err) = signal_allocation( + &NEW_NOMAD_CONFIG, + &alloc_id, + None, + Some(super::NOMAD_REGION), + None, + None, + Some(nomad_client_old::models::AllocSignalRequest { + task: None, + signal: Some("SIGKILL".to_string()), + }), + ) + .await + { + tracing::warn!( + ?err, + ?alloc_id, + "error while trying to manually kill allocation" + ); + } + } + .in_current_span(), + )?; + + Ok(()) +} + +// Have to patch `nomad_client::apis::allocations_api::signal_allocation` because it uses `/allocation` +// instead of `/client/allocation` +async fn signal_allocation( + configuration: &nomad_client::apis::configuration::Configuration, + alloc_id: &str, + namespace: Option<&str>, + region: Option<&str>, + index: Option, + wait: Option<&str>, + alloc_signal_request: Option, +) -> Result< + (), + nomad_client::apis::Error, +> { + let local_var_client = &configuration.client; + + let local_var_uri_str = format!( + "{}/client/allocation/{alloc_id}/signal", + configuration.base_path, + alloc_id = nomad_client::apis::urlencode(alloc_id), + ); + let mut local_var_req_builder = local_var_client.post(local_var_uri_str.as_str()); + + if let Some(ref local_var_str) = namespace { + local_var_req_builder = + local_var_req_builder.query(&[("namespace", &local_var_str.to_string())]); + } + if let Some(ref local_var_str) = region { + local_var_req_builder = + local_var_req_builder.query(&[("region", &local_var_str.to_string())]); + } + if let Some(ref local_var_str) = index { + local_var_req_builder = + local_var_req_builder.query(&[("index", &local_var_str.to_string())]); + } + if let Some(ref local_var_str) = wait { + local_var_req_builder = + local_var_req_builder.query(&[("wait", &local_var_str.to_string())]); + } + if let Some(ref local_var_user_agent) = configuration.user_agent { + local_var_req_builder = + local_var_req_builder.header(http::header::USER_AGENT, local_var_user_agent.clone()); + } + local_var_req_builder = local_var_req_builder.json(&alloc_signal_request); + + let local_var_req = local_var_req_builder.build()?; + let local_var_resp = local_var_client.execute(local_var_req).await?; + + let local_var_status = local_var_resp.status(); + let local_var_content = local_var_resp.text().await?; + + if !local_var_status.is_client_error() && !local_var_status.is_server_error() { + Ok(()) + } else { + let local_var_entity: Option< + nomad_client_old::apis::allocations_api::SignalAllocationError, + > = serde_json::from_str(&local_var_content).ok(); + let local_var_error = nomad_client::apis::ResponseContent { + status: local_var_status, + content: local_var_content, + entity: local_var_entity, + }; + Err(nomad_client::apis::Error::ResponseError(local_var_error)) + } +} diff --git a/svc/pkg/ds/tests/server_create.rs b/svc/pkg/ds/tests/server_create.rs index ccf2235ed2..8ebf3065fb 100644 --- a/svc/pkg/ds/tests/server_create.rs +++ b/svc/pkg/ds/tests/server_create.rs @@ -8,7 +8,7 @@ use rivet_operation::prelude::proto::{ use serde_json::json; #[workflow_test] -async fn create(ctx: TestCtx) { +async fn server_create(ctx: TestCtx) { let game_res = op!([ctx] faker_game { ..Default::default() })