diff --git a/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs b/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs index da31feae6a..6fc52e496f 100644 --- a/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs +++ b/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs @@ -2,7 +2,7 @@ use std::time::Duration; use chirp_workflow::prelude::*; -use crate::util::{signal_allocation, NOMAD_CONFIG, NOMAD_REGION}; +use crate::util::{NOMAD_CONFIG, NOMAD_REGION}; // TODO: const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2); @@ -15,6 +15,7 @@ pub struct Input { #[workflow] pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { + let job_id = unwrap_ref!(input.alloc.job_id); let alloc_id = unwrap_ref!(input.alloc.ID); let nomad_node_id = unwrap_ref!(input.alloc.node_id, "alloc has no node id"); @@ -64,8 +65,8 @@ pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> .await?; if db_res.kill_alloc { - ctx.activity(KillAllocInput { - alloc_id: alloc_id.clone(), + ctx.activity(DeleteJobInput { + job_id: job_id.clone(), }) .await?; } @@ -227,37 +228,35 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult GlobalResult<()> { - if let Err(err) = signal_allocation( +#[activity(DeleteJob)] +async fn kill_alloc(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<()> { + if let Err(err) = nomad_client::apis::jobs_api::delete_job( &NOMAD_CONFIG, - &input.alloc_id, - None, + &input.job_id, Some(NOMAD_REGION), None, None, - Some(nomad_client_old::models::AllocSignalRequest { - task: None, - signal: Some("SIGKILL".to_string()), - }), + None, + Some(false), + None, ) .await { tracing::warn!( ?err, - ?input.alloc_id, - "error while trying to manually kill allocation" + ?input.job_id, + "error while trying to manually kill job" ); } diff --git a/svc/pkg/job-run/src/workers/drain_all.rs b/svc/pkg/job-run/src/workers/drain_all.rs index a480c41b2c..bf5518fb98 100644 --- a/svc/pkg/job-run/src/workers/drain_all.rs +++ b/svc/pkg/job-run/src/workers/drain_all.rs @@ -3,16 +3,17 @@ use proto::backend::pkg::*; #[worker(name = "job-run-drain-all")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { - chirp_workflow::compat::workflow( - ctx, - crate::workflows::drain_all::Input { - nomad_node_id: ctx.nomad_node_id.clone(), - drain_timeout: ctx.drain_timeout, - }, - ) - .await? - .dispatch() - .await?; + // TODO: Disabled for now + // chirp_workflow::compat::workflow( + // ctx, + // crate::workflows::drain_all::Input { + // nomad_node_id: ctx.nomad_node_id.clone(), + // drain_timeout: ctx.drain_timeout, + // }, + // ) + // .await? + // .dispatch() + // .await?; Ok(()) } diff --git a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs index 67ce21a3b6..887fd1d4c4 100644 --- a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs @@ -3,10 +3,7 @@ use proto::backend::{self, pkg::*}; use redis::AsyncCommands; use serde::Deserialize; -use crate::{ - util::{signal_allocation, NOMAD_REGION}, - workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG}, -}; +use crate::{util::NOMAD_REGION, workers::NEW_NOMAD_CONFIG}; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] @@ -320,26 +317,24 @@ async fn update_db( .map(|id| id != &alloc_id) .unwrap_or_default() { - tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing new allocation"); + tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing job"); - if let Err(err) = signal_allocation( - &NOMAD_CONFIG, - &alloc_id, - None, + if let Err(err) = nomad_client_new::apis::jobs_api::delete_job( + &NEW_NOMAD_CONFIG, + &job_id, Some(NOMAD_REGION), None, None, - Some(nomad_client::models::AllocSignalRequest { - task: None, - signal: Some("SIGKILL".to_string()), - }), + None, + Some(false), + None, ) .await { tracing::warn!( ?err, ?alloc_id, - "error while trying to manually kill allocation" + "error while trying to manually kill job" ); } } diff --git a/svc/pkg/linode/src/util/api.rs b/svc/pkg/linode/src/util/api.rs index 45cb688c64..493f3cf43d 100644 --- a/svc/pkg/linode/src/util/api.rs +++ b/svc/pkg/linode/src/util/api.rs @@ -305,7 +305,7 @@ pub async fn wait_disk_ready(client: &Client, linode_id: u64, disk_id: u64) -> G loop { let res = client .inner() - .get(&format!( + .get(format!( "https://api.linode.com/v4/linode/instances/{linode_id}/disks/{disk_id}" )) .send() diff --git a/svc/pkg/linode/src/util/client.rs b/svc/pkg/linode/src/util/client.rs index 51900cf5ee..32f1a7d388 100644 --- a/svc/pkg/linode/src/util/client.rs +++ b/svc/pkg/linode/src/util/client.rs @@ -120,7 +120,7 @@ impl Client { let res = self .request( self.inner - .get(&format!("https://api.linode.com/v4{endpoint}")), + .get(format!("https://api.linode.com/v4{endpoint}")), None, false, ) @@ -132,7 +132,7 @@ impl Client { pub async fn delete(&self, endpoint: &str) -> GlobalResult<()> { self.request( self.inner - .delete(&format!("https://api.linode.com/v4{endpoint}")), + .delete(format!("https://api.linode.com/v4{endpoint}")), None, true, ) @@ -149,7 +149,7 @@ impl Client { let res = self .request( self.inner - .post(&format!("https://api.linode.com/v4{endpoint}")) + .post(format!("https://api.linode.com/v4{endpoint}")) .header("content-type", "application/json"), Some(body), false, @@ -162,7 +162,7 @@ impl Client { pub async fn post_no_res(&self, endpoint: &str, body: serde_json::Value) -> GlobalResult<()> { self.request( self.inner - .post(&format!("https://api.linode.com/v4{endpoint}")) + .post(format!("https://api.linode.com/v4{endpoint}")) .header("content-type", "application/json"), Some(body), false, diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index 402a1b2695..a5e4e7de1e 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -266,6 +266,7 @@ struct CreateInstanceOutput { } #[activity(CreateInstance)] +#[timeout = 120] async fn create_instance( ctx: &ActivityCtx, input: &CreateInstanceInput, diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs index 897ff8c07d..2983985f5f 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use rivet_operation::prelude::proto::backend::pkg::nomad; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs index fe274d2d26..85a0c45a8f 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use rivet_operation::prelude::proto::backend::pkg::nomad; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "PascalCase")] diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs index cafc1fae9e..f42175c1dc 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use rivet_operation::prelude::proto::backend::pkg::nomad; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")]