From 9ebbf9383a5a745bfe2fc111775519097397b4e4 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 5 Sep 2024 18:53:21 +0000 Subject: [PATCH 1/6] fix(mm, ds): fix dupe alloc killing --- .../src/workflows/server/nomad_alloc_plan.rs | 33 +++++++++---------- svc/pkg/job-run/src/workers/drain_all.rs | 21 ++++++------ .../src/workers/nomad_monitor_alloc_plan.rs | 23 +++++-------- svc/pkg/linode/src/util/api.rs | 2 +- svc/pkg/linode/src/util/client.rs | 8 ++--- svc/pkg/linode/src/workflows/server/mod.rs | 1 + .../monitor/src/monitors/alloc_plan.rs | 1 - .../monitor/src/monitors/alloc_update.rs | 1 - .../monitor/src/monitors/eval_update.rs | 1 - 9 files changed, 42 insertions(+), 49 deletions(-) diff --git a/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs b/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs index da31feae6a..6fc52e496f 100644 --- a/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs +++ b/svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs @@ -2,7 +2,7 @@ use std::time::Duration; use chirp_workflow::prelude::*; -use crate::util::{signal_allocation, NOMAD_CONFIG, NOMAD_REGION}; +use crate::util::{NOMAD_CONFIG, NOMAD_REGION}; // TODO: const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2); @@ -15,6 +15,7 @@ pub struct Input { #[workflow] pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { + let job_id = unwrap_ref!(input.alloc.job_id); let alloc_id = unwrap_ref!(input.alloc.ID); let nomad_node_id = unwrap_ref!(input.alloc.node_id, "alloc has no node id"); @@ -64,8 +65,8 @@ pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> .await?; if db_res.kill_alloc { - ctx.activity(KillAllocInput { - alloc_id: alloc_id.clone(), + ctx.activity(DeleteJobInput { + job_id: job_id.clone(), }) .await?; } @@ -227,37 +228,35 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult GlobalResult<()> { - if let Err(err) = signal_allocation( +#[activity(DeleteJob)] +async fn kill_alloc(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<()> { + if let Err(err) = nomad_client::apis::jobs_api::delete_job( &NOMAD_CONFIG, - &input.alloc_id, - None, + &input.job_id, Some(NOMAD_REGION), None, None, - Some(nomad_client_old::models::AllocSignalRequest { - task: None, - signal: Some("SIGKILL".to_string()), - }), + None, + Some(false), + None, ) .await { tracing::warn!( ?err, - ?input.alloc_id, - "error while trying to manually kill allocation" + ?input.job_id, + "error while trying to manually kill job" ); } diff --git a/svc/pkg/job-run/src/workers/drain_all.rs b/svc/pkg/job-run/src/workers/drain_all.rs index a480c41b2c..bf5518fb98 100644 --- a/svc/pkg/job-run/src/workers/drain_all.rs +++ b/svc/pkg/job-run/src/workers/drain_all.rs @@ -3,16 +3,17 @@ use proto::backend::pkg::*; #[worker(name = "job-run-drain-all")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { - chirp_workflow::compat::workflow( - ctx, - crate::workflows::drain_all::Input { - nomad_node_id: ctx.nomad_node_id.clone(), - drain_timeout: ctx.drain_timeout, - }, - ) - .await? - .dispatch() - .await?; + // TODO: Disabled for now + // chirp_workflow::compat::workflow( + // ctx, + // crate::workflows::drain_all::Input { + // nomad_node_id: ctx.nomad_node_id.clone(), + // drain_timeout: ctx.drain_timeout, + // }, + // ) + // .await? + // .dispatch() + // .await?; Ok(()) } diff --git a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs index 67ce21a3b6..887fd1d4c4 100644 --- a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs @@ -3,10 +3,7 @@ use proto::backend::{self, pkg::*}; use redis::AsyncCommands; use serde::Deserialize; -use crate::{ - util::{signal_allocation, NOMAD_REGION}, - workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG}, -}; +use crate::{util::NOMAD_REGION, workers::NEW_NOMAD_CONFIG}; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] @@ -320,26 +317,24 @@ async fn update_db( .map(|id| id != &alloc_id) .unwrap_or_default() { - tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing new allocation"); + tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing job"); - if let Err(err) = signal_allocation( - &NOMAD_CONFIG, - &alloc_id, - None, + if let Err(err) = nomad_client_new::apis::jobs_api::delete_job( + &NEW_NOMAD_CONFIG, + &job_id, Some(NOMAD_REGION), None, None, - Some(nomad_client::models::AllocSignalRequest { - task: None, - signal: Some("SIGKILL".to_string()), - }), + None, + Some(false), + None, ) .await { tracing::warn!( ?err, ?alloc_id, - "error while trying to manually kill allocation" + "error while trying to manually kill job" ); } } diff --git a/svc/pkg/linode/src/util/api.rs b/svc/pkg/linode/src/util/api.rs index 45cb688c64..493f3cf43d 100644 --- a/svc/pkg/linode/src/util/api.rs +++ b/svc/pkg/linode/src/util/api.rs @@ -305,7 +305,7 @@ pub async fn wait_disk_ready(client: &Client, linode_id: u64, disk_id: u64) -> G loop { let res = client .inner() - .get(&format!( + .get(format!( "https://api.linode.com/v4/linode/instances/{linode_id}/disks/{disk_id}" )) .send() diff --git a/svc/pkg/linode/src/util/client.rs b/svc/pkg/linode/src/util/client.rs index 51900cf5ee..32f1a7d388 100644 --- a/svc/pkg/linode/src/util/client.rs +++ b/svc/pkg/linode/src/util/client.rs @@ -120,7 +120,7 @@ impl Client { let res = self .request( self.inner - .get(&format!("https://api.linode.com/v4{endpoint}")), + .get(format!("https://api.linode.com/v4{endpoint}")), None, false, ) @@ -132,7 +132,7 @@ impl Client { pub async fn delete(&self, endpoint: &str) -> GlobalResult<()> { self.request( self.inner - .delete(&format!("https://api.linode.com/v4{endpoint}")), + .delete(format!("https://api.linode.com/v4{endpoint}")), None, true, ) @@ -149,7 +149,7 @@ impl Client { let res = self .request( self.inner - .post(&format!("https://api.linode.com/v4{endpoint}")) + .post(format!("https://api.linode.com/v4{endpoint}")) .header("content-type", "application/json"), Some(body), false, @@ -162,7 +162,7 @@ impl Client { pub async fn post_no_res(&self, endpoint: &str, body: serde_json::Value) -> GlobalResult<()> { self.request( self.inner - .post(&format!("https://api.linode.com/v4{endpoint}")) + .post(format!("https://api.linode.com/v4{endpoint}")) .header("content-type", "application/json"), Some(body), false, diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index 402a1b2695..a5e4e7de1e 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -266,6 +266,7 @@ struct CreateInstanceOutput { } #[activity(CreateInstance)] +#[timeout = 120] async fn create_instance( ctx: &ActivityCtx, input: &CreateInstanceInput, diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs index 897ff8c07d..2983985f5f 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use rivet_operation::prelude::proto::backend::pkg::nomad; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs index fe274d2d26..85a0c45a8f 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use rivet_operation::prelude::proto::backend::pkg::nomad; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "PascalCase")] diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs index cafc1fae9e..f42175c1dc 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use rivet_operation::prelude::proto::backend::pkg::nomad; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] From e4d77d152aa8b1b9f8073db0f284ed15ccb75584 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 5 Sep 2024 18:54:22 +0000 Subject: [PATCH 2/6] fix(workflows): add error message for max sql retries --- lib/chirp-workflow/core/src/db/pg_nats.rs | 14 +++++++++----- lib/chirp-workflow/core/src/error.rs | 4 ++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/chirp-workflow/core/src/db/pg_nats.rs b/lib/chirp-workflow/core/src/db/pg_nats.rs index 7184d4e285..4cb5d650b5 100644 --- a/lib/chirp-workflow/core/src/db/pg_nats.rs +++ b/lib/chirp-workflow/core/src/db/pg_nats.rs @@ -76,10 +76,16 @@ impl DatabasePgNats { T: 'a, { let mut backoff = rivet_util::Backoff::new(3, None, QUERY_RETRY_MS, 50); + let mut i = 0; - for _ in 0..MAX_QUERY_RETRIES { + loop { match cb().await { Err(WorkflowError::Sqlx(err)) => { + i += 1; + if i > MAX_QUERY_RETRIES { + return Err(WorkflowError::MaxSqlRetries(err)); + } + use sqlx::Error::*; match &err { // Retry transaction errors immediately @@ -88,13 +94,13 @@ impl DatabasePgNats { .message() .contains("TransactionRetryWithProtoRefreshError") { - tracing::info!(message=%db_err.message(), "transaction retry"); + tracing::warn!(message=%db_err.message(), "transaction retry"); } } // Retry internal errors with a backoff Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed | WorkerCrashed => { - tracing::info!(?err, "query retry"); + tracing::warn!(?err, "query retry"); backoff.tick().await; } // Throw error @@ -104,8 +110,6 @@ impl DatabasePgNats { x => return x, } } - - Err(WorkflowError::MaxSqlRetries) } } diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index f55f902565..2631e34a36 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -117,8 +117,8 @@ pub enum WorkflowError { #[error("sql: {0}")] Sqlx(sqlx::Error), - #[error("max sql retries")] - MaxSqlRetries, + #[error("max sql retries (last error: {0})")] + MaxSqlRetries(sqlx::Error), #[error("pools: {0}")] Pools(#[from] rivet_pools::Error), From 1cfedd5f6a2e395003eafbaaa5bcf2a83a6038a6 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 5 Sep 2024 23:25:20 +0000 Subject: [PATCH 3/6] fix: allocation sizes for nomad --- lib/bolt/core/src/context/service.rs | 56 ++++++++++--------- .../cluster/src/workflows/datacenter/scale.rs | 8 +-- .../src/workers/lobby_create/nomad_job.rs | 14 +---- svc/pkg/tier/ops/list/src/lib.rs | 11 +--- 4 files changed, 37 insertions(+), 52 deletions(-) diff --git a/lib/bolt/core/src/context/service.rs b/lib/bolt/core/src/context/service.rs index d4e5d4c99d..e32c35d940 100644 --- a/lib/bolt/core/src/context/service.rs +++ b/lib/bolt/core/src/context/service.rs @@ -510,33 +510,35 @@ impl ServiceContextData { ); } - let can_depend = - if self.is_monolith_worker() { - matches!( - dep.config().kind, - ServiceKind::Database { .. } - | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } - | ServiceKind::Package { .. } - | ServiceKind::Consumer { .. } - ) - } else if matches!(self.config().kind, ServiceKind::Api { .. }) { - matches!( - dep.config().kind, - ServiceKind::Database { .. } - | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } - | ServiceKind::Package { .. } - | ServiceKind::ApiRoutes { .. } - | ServiceKind::Consumer { .. } - ) - } else { - matches!( - dep.config().kind, - ServiceKind::Database { .. } - | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } - | ServiceKind::Package { .. } - | ServiceKind::Consumer { .. } - ) - }; + let can_depend = if self.is_monolith_worker() { + matches!( + dep.config().kind, + ServiceKind::Database { .. } + | ServiceKind::Cache { .. } + | ServiceKind::Operation { .. } + | ServiceKind::Package { .. } + | ServiceKind::Consumer { .. } + ) + } else if matches!(self.config().kind, ServiceKind::Api { .. }) { + matches!( + dep.config().kind, + ServiceKind::Database { .. } + | ServiceKind::Cache { .. } + | ServiceKind::Operation { .. } + | ServiceKind::Package { .. } + | ServiceKind::ApiRoutes { .. } + | ServiceKind::Consumer { .. } + ) + } else { + matches!( + dep.config().kind, + ServiceKind::Database { .. } + | ServiceKind::Cache { .. } + | ServiceKind::Operation { .. } + | ServiceKind::Package { .. } + | ServiceKind::Consumer { .. } + ) + }; if !can_depend { panic!( diff --git a/svc/pkg/cluster/src/workflows/datacenter/scale.rs b/svc/pkg/cluster/src/workflows/datacenter/scale.rs index 76642317e8..ed533ebac2 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/scale.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/scale.rs @@ -246,9 +246,12 @@ async fn inner( .map(TryInto::try_into) .collect::>>()?; - // Sort job servers by memory usage + // Sort job servers by allocated memory servers.sort_by_key(|server| memory_by_server.get(&server.server_id)); + // TODO: remove + tracing::info!(server_ids=?servers.iter().map(|s| s.server_id).collect::>(), ?memory_by_server, "server topo"); + // TODO: RVT-3732 Sort gg and ats servers by cpu usage // servers.sort_by_key @@ -388,7 +391,6 @@ async fn scale_down_job_servers( let drain_candidates = nomad_servers .iter() - .rev() .take(drain_count) .map(|server| server.server_id); @@ -420,7 +422,6 @@ async fn scale_down_gg_servers<'a, I: Iterator + DoubleEndedI tracing::info!(count=%drain_count, "draining gg servers"); let drain_candidates = installed_servers - .rev() .take(drain_count) .map(|server| server.server_id); @@ -455,7 +456,6 @@ async fn scale_down_ats_servers< tracing::info!(count=%drain_count, "draining ats servers"); let drain_candidates = installed_servers - .rev() .take(drain_count) .map(|server| server.server_id); diff --git a/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs b/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs index cce6323d8f..ab2e1a6020 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs @@ -94,7 +94,7 @@ pub fn gen_lobby_docker_job( // Nomad configures CPU based on MHz, not millicores. We havel to calculate the CPU share // by knowing how many MHz are on the client. CPU: if tier.rivet_cores_numerator < tier.rivet_cores_denominator { - Some((tier.cpu - util_job::TASK_CLEANUP_CPU as u64).try_into()?) + Some(tier.cpu.try_into()?) } else { None }, @@ -103,18 +103,10 @@ pub fn gen_lobby_docker_job( } else { None }, - memory_mb: Some( - (TryInto::::try_into(memory)? / (1024 * 1024) - - util_job::TASK_CLEANUP_MEMORY as i64) - .try_into()?, - ), + memory_mb: Some(tier.memory.try_into()?), // Allow oversubscribing memory by 50% of the reserved // memory if using less than the node's total memory - memory_max_mb: Some( - (TryInto::::try_into(memory_max)? / (1024 * 1024) - - util_job::TASK_CLEANUP_MEMORY as i64) - .try_into()?, - ), + memory_max_mb: Some(tier.memory_max.try_into()?), disk_mb: Some(tier.disk as i32), // TODO: Is this deprecated? ..Resources::new() }; diff --git a/svc/pkg/tier/ops/list/src/lib.rs b/svc/pkg/tier/ops/list/src/lib.rs index 02636671b2..f51dbc1e2a 100644 --- a/svc/pkg/tier/ops/list/src/lib.rs +++ b/svc/pkg/tier/ops/list/src/lib.rs @@ -53,16 +53,7 @@ async fn handle(ctx: OperationContext) -> GlobalResult Date: Thu, 5 Sep 2024 23:50:52 +0000 Subject: [PATCH 4/6] fix(job-run): fix dupe allocs, re-enable drain all --- .../cluster/src/workflows/datacenter/scale.rs | 3 -- svc/pkg/job-run/src/lib.rs | 1 + svc/pkg/job-run/src/workers/drain_all.rs | 21 +++++---- .../src/workers/nomad_monitor_alloc_plan.rs | 6 +-- svc/pkg/job-run/src/workflows/drain_all.rs | 45 +++++++++++++++++++ .../src/workers/nomad_node_closed_set.rs | 29 +----------- 6 files changed, 59 insertions(+), 46 deletions(-) diff --git a/svc/pkg/cluster/src/workflows/datacenter/scale.rs b/svc/pkg/cluster/src/workflows/datacenter/scale.rs index ed533ebac2..e894c86af7 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/scale.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/scale.rs @@ -249,9 +249,6 @@ async fn inner( // Sort job servers by allocated memory servers.sort_by_key(|server| memory_by_server.get(&server.server_id)); - // TODO: remove - tracing::info!(server_ids=?servers.iter().map(|s| s.server_id).collect::>(), ?memory_by_server, "server topo"); - // TODO: RVT-3732 Sort gg and ats servers by cpu usage // servers.sort_by_key diff --git a/svc/pkg/job-run/src/lib.rs b/svc/pkg/job-run/src/lib.rs index 012e22076e..03ab2fd281 100644 --- a/svc/pkg/job-run/src/lib.rs +++ b/svc/pkg/job-run/src/lib.rs @@ -10,6 +10,7 @@ pub fn registry() -> WorkflowResult { let mut registry = Registry::new(); registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/svc/pkg/job-run/src/workers/drain_all.rs b/svc/pkg/job-run/src/workers/drain_all.rs index bf5518fb98..ffe38260f6 100644 --- a/svc/pkg/job-run/src/workers/drain_all.rs +++ b/svc/pkg/job-run/src/workers/drain_all.rs @@ -3,17 +3,16 @@ use proto::backend::pkg::*; #[worker(name = "job-run-drain-all")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { - // TODO: Disabled for now - // chirp_workflow::compat::workflow( - // ctx, - // crate::workflows::drain_all::Input { - // nomad_node_id: ctx.nomad_node_id.clone(), - // drain_timeout: ctx.drain_timeout, - // }, - // ) - // .await? - // .dispatch() - // .await?; + chirp_workflow::compat::workflow( + ctx, + crate::workflows::drain_all::Input2 { + nomad_node_id: ctx.nomad_node_id.clone(), + drain_timeout: ctx.drain_timeout, + }, + ) + .await? + .dispatch() + .await?; Ok(()) } diff --git a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs index 887fd1d4c4..32afe734c6 100644 --- a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs @@ -331,11 +331,7 @@ async fn update_db( ) .await { - tracing::warn!( - ?err, - ?alloc_id, - "error while trying to manually kill job" - ); + tracing::warn!(?err, ?alloc_id, "error while trying to manually kill job"); } } } diff --git a/svc/pkg/job-run/src/workflows/drain_all.rs b/svc/pkg/job-run/src/workflows/drain_all.rs index a480af5631..bed1713d9f 100644 --- a/svc/pkg/job-run/src/workflows/drain_all.rs +++ b/svc/pkg/job-run/src/workflows/drain_all.rs @@ -44,6 +44,33 @@ pub async fn job_run_drain_all(ctx: &mut WorkflowCtx, input: &Input) -> GlobalRe Ok(()) } +#[derive(Debug, Serialize, Deserialize)] +pub struct Input2 { + pub nomad_node_id: String, + pub drain_timeout: u64, +} + +#[workflow(Workflow2)] +pub async fn job_run_drain_all2(ctx: &mut WorkflowCtx, input: &Input2) -> GlobalResult<()> { + // We fetch here so that when we kill allocs later, we don't refetch new job runs that might be on the + // nomad node. Only allocs fetched at this time will be killed. + let job_runs = ctx + .activity(FetchJobRunsInput { + nomad_node_id: input.nomad_node_id.clone(), + }) + .await?; + + ctx.sleep(input.drain_timeout.saturating_sub(DRAIN_PADDING)) + .await?; + + ctx.activity(StopJobRuns2Input { + run_ids: job_runs.iter().map(|jr| jr.run_id).collect(), + }) + .await?; + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize, Hash)] struct FetchJobRunsInput { nomad_node_id: String, @@ -95,6 +122,24 @@ async fn stop_job_runs(ctx: &ActivityCtx, input: &StopJobRunsInput) -> GlobalRes Ok(()) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct StopJobRuns2Input { + run_ids: Vec, +} + +#[activity(StopJobRuns2)] +async fn stop_job_runs2(ctx: &ActivityCtx, input: &StopJobRuns2Input) -> GlobalResult<()> { + for run_id in &input.run_ids { + msg!([ctx] job_run::msg::stop(run_id) { + run_id: Some((*run_id).into()), + skip_kill_alloc: false, + }) + .await?; + } + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize, Hash)] struct KillAllocsInput { nomad_node_id: String, diff --git a/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs b/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs index 23899e3b76..61be96b27f 100644 --- a/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs +++ b/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs @@ -66,34 +66,9 @@ async fn worker( } pipe.query_async(&mut ctx.redis_mm().await?).await?; - } else { - let mut script = REDIS_SCRIPT.prepare_invoke(); - - script.arg(lobby_rows.len()); - - for lobby in lobby_rows { - script - .key(util_mm::key::lobby_config(lobby.lobby_id)) - .key(util_mm::key::lobby_player_ids(lobby.lobby_id)) - .key(util_mm::key::lobby_available_spots( - lobby.namespace_id, - datacenter_id, - lobby.lobby_group_id, - util_mm::JoinKind::Normal, - )) - .key(util_mm::key::lobby_available_spots( - lobby.namespace_id, - datacenter_id, - lobby.lobby_group_id, - util_mm::JoinKind::Party, - )) - .arg(lobby.lobby_id.to_string()) - .arg(lobby.max_players_normal) - .arg(lobby.max_players_party); - } - - script.invoke_async(&mut ctx.redis_mm().await?).await?; } + // NOTE: Don't do anything on undrain + Ok(()) } From b8c5306fdb1dce5feb7cc00c2dfafee5fa0ca456 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 6 Sep 2024 04:30:50 +0000 Subject: [PATCH 5/6] fix(cluster): skip pruning servers without provider server id --- svc/pkg/cluster/src/ops/server/prune_with_filter.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/svc/pkg/cluster/src/ops/server/prune_with_filter.rs b/svc/pkg/cluster/src/ops/server/prune_with_filter.rs index b9d7ef8ec0..0708a925bf 100644 --- a/svc/pkg/cluster/src/ops/server/prune_with_filter.rs +++ b/svc/pkg/cluster/src/ops/server/prune_with_filter.rs @@ -85,7 +85,11 @@ async fn run_for_linode_account( tracing::info!("pruning {} servers", servers.len()); for server in servers { - let linode_id = unwrap_ref!(server.provider_server_id).parse()?; + let Some(linode_id) = &server.provider_server_id else { + tracing::warn!(server_id = ?server.server_id, "provider_server_ide is none"); + continue; + }; + let linode_id = linode_id.parse()?; tracing::info!("pruning {} (linode_id {})", server.server_id, linode_id); From 2ce970fe8665df2030752bcd279eaf19b1c6bdf2 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 10 Sep 2024 04:02:10 +0000 Subject: [PATCH 6/6] chore: increase install timeout --- svc/pkg/cluster/src/workflows/server/install/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/svc/pkg/cluster/src/workflows/server/install/mod.rs b/svc/pkg/cluster/src/workflows/server/install/mod.rs index 9911e8543a..dd14508386 100644 --- a/svc/pkg/cluster/src/workflows/server/install/mod.rs +++ b/svc/pkg/cluster/src/workflows/server/install/mod.rs @@ -96,7 +96,7 @@ struct InstallOverSshInput { } #[activity(InstallOverSsh)] -#[timeout = 120] +#[timeout = 300] #[max_retries = 10] async fn install_over_ssh(ctx: &ActivityCtx, input: &InstallOverSshInput) -> GlobalResult<()> { let public_ip = input.public_ip;