Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions svc/pkg/ds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ hex = "0.4"
http = "0.2"
lazy_static = "1.4.0"
nomad-util = { path = "../../../lib/nomad-util" }
nomad-client-old = { package = "nomad-client", version = "0.0.9" }
rand = "0.8"
regex = "1.10"
reqwest = { version = "0.12", features = ["json"] }
Expand Down
112 changes: 109 additions & 3 deletions svc/pkg/ds/src/workflows/server/destroy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chirp_workflow::prelude::*;
use futures_util::FutureExt;
use serde_json::json;
use tracing::Instrument;

use crate::util::NEW_NOMAD_CONFIG;

Expand All @@ -20,6 +21,7 @@ pub(crate) async fn ds_server_destroy(ctx: &mut WorkflowCtx, input: &Input) -> G

ctx.activity(DeleteJobInput {
job_id: dynamic_server.dispatched_job_id.clone(),
alloc_id: dynamic_server.alloc_id.clone(),
})
.await?;

Expand Down Expand Up @@ -86,6 +88,7 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
#[derive(Debug, Serialize, Deserialize, Hash)]
struct DeleteJobInput {
job_id: String,
alloc_id: String,
}

#[activity(DeleteJob)]
Expand Down Expand Up @@ -113,9 +116,7 @@ async fn delete_job(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<(
Ok(_) => {
tracing::info!("job stopped");

// tokio::task::spawn(async move {

// });
kill_allocation(input.alloc_id.clone())?;
}
Err(err) => {
tracing::warn!(?err, "error thrown while stopping job");
Expand All @@ -127,3 +128,108 @@ async fn delete_job(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<(

#[message("ds_server_destroy_complete")]
pub struct DestroyComplete {}

/// Kills the allocation after 30 seconds
///
/// See `docs/packages/job/JOB_DRAINING_AND_KILL_TIMEOUTS.md`
fn kill_allocation(alloc_id: String) -> GlobalResult<()> {
tokio::task::Builder::new()
.name("ds::workflows::server::destroy::kill_allocation")
.spawn(
async move {
tokio::time::sleep(util_job::JOB_STOP_TIMEOUT).await;

tracing::info!(?alloc_id, "manually killing allocation");

if let Err(err) = signal_allocation(
&NEW_NOMAD_CONFIG,
&alloc_id,
None,
Some(super::NOMAD_REGION),
None,
None,
Some(nomad_client_old::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill allocation"
);
}
}
.in_current_span(),
)?;

Ok(())
}

// Have to patch `nomad_client::apis::allocations_api::signal_allocation` because it uses `/allocation`
// instead of `/client/allocation`
async fn signal_allocation(
configuration: &nomad_client::apis::configuration::Configuration,
alloc_id: &str,
namespace: Option<&str>,
region: Option<&str>,
index: Option<i64>,
wait: Option<&str>,
alloc_signal_request: Option<nomad_client_old::models::AllocSignalRequest>,
) -> Result<
(),
nomad_client::apis::Error<nomad_client_old::apis::allocations_api::SignalAllocationError>,
> {
let local_var_client = &configuration.client;

let local_var_uri_str = format!(
"{}/client/allocation/{alloc_id}/signal",
configuration.base_path,
alloc_id = nomad_client::apis::urlencode(alloc_id),
);
let mut local_var_req_builder = local_var_client.post(local_var_uri_str.as_str());

if let Some(ref local_var_str) = namespace {
local_var_req_builder =
local_var_req_builder.query(&[("namespace", &local_var_str.to_string())]);
}
if let Some(ref local_var_str) = region {
local_var_req_builder =
local_var_req_builder.query(&[("region", &local_var_str.to_string())]);
}
if let Some(ref local_var_str) = index {
local_var_req_builder =
local_var_req_builder.query(&[("index", &local_var_str.to_string())]);
}
if let Some(ref local_var_str) = wait {
local_var_req_builder =
local_var_req_builder.query(&[("wait", &local_var_str.to_string())]);
}
if let Some(ref local_var_user_agent) = configuration.user_agent {
local_var_req_builder =
local_var_req_builder.header(http::header::USER_AGENT, local_var_user_agent.clone());
}
local_var_req_builder = local_var_req_builder.json(&alloc_signal_request);

let local_var_req = local_var_req_builder.build()?;
let local_var_resp = local_var_client.execute(local_var_req).await?;

let local_var_status = local_var_resp.status();
let local_var_content = local_var_resp.text().await?;

if !local_var_status.is_client_error() && !local_var_status.is_server_error() {
Ok(())
} else {
let local_var_entity: Option<
nomad_client_old::apis::allocations_api::SignalAllocationError,
> = serde_json::from_str(&local_var_content).ok();
let local_var_error = nomad_client::apis::ResponseContent {
status: local_var_status,
content: local_var_content,
entity: local_var_entity,
};
Err(nomad_client::apis::Error::ResponseError(local_var_error))
}
}
2 changes: 1 addition & 1 deletion svc/pkg/ds/tests/server_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rivet_operation::prelude::proto::{
use serde_json::json;

#[workflow_test]
async fn create(ctx: TestCtx) {
async fn server_create(ctx: TestCtx) {
let game_res = op!([ctx] faker_game {
..Default::default()
})
Expand Down