From 3d69a7b32cacbf6c0d76f41a9ed7f26197c06ce1 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 5 Sep 2024 23:50:52 +0000 Subject: [PATCH] fix(job-run): fix dupe allocs, re-enable drain all --- .../cluster/src/workflows/datacenter/scale.rs | 3 -- svc/pkg/job-run/src/lib.rs | 1 + svc/pkg/job-run/src/workers/drain_all.rs | 21 +++++---- .../src/workers/nomad_monitor_alloc_plan.rs | 6 +-- svc/pkg/job-run/src/workflows/drain_all.rs | 45 +++++++++++++++++++ .../src/workers/nomad_node_closed_set.rs | 29 +----------- 6 files changed, 59 insertions(+), 46 deletions(-) diff --git a/svc/pkg/cluster/src/workflows/datacenter/scale.rs b/svc/pkg/cluster/src/workflows/datacenter/scale.rs index ed533ebac2..e894c86af7 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/scale.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/scale.rs @@ -249,9 +249,6 @@ async fn inner( // Sort job servers by allocated memory servers.sort_by_key(|server| memory_by_server.get(&server.server_id)); - // TODO: remove - tracing::info!(server_ids=?servers.iter().map(|s| s.server_id).collect::>(), ?memory_by_server, "server topo"); - // TODO: RVT-3732 Sort gg and ats servers by cpu usage // servers.sort_by_key diff --git a/svc/pkg/job-run/src/lib.rs b/svc/pkg/job-run/src/lib.rs index 012e22076e..03ab2fd281 100644 --- a/svc/pkg/job-run/src/lib.rs +++ b/svc/pkg/job-run/src/lib.rs @@ -10,6 +10,7 @@ pub fn registry() -> WorkflowResult { let mut registry = Registry::new(); registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/svc/pkg/job-run/src/workers/drain_all.rs b/svc/pkg/job-run/src/workers/drain_all.rs index bf5518fb98..ffe38260f6 100644 --- a/svc/pkg/job-run/src/workers/drain_all.rs +++ b/svc/pkg/job-run/src/workers/drain_all.rs @@ -3,17 +3,16 @@ use proto::backend::pkg::*; #[worker(name = "job-run-drain-all")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { - // TODO: Disabled for now - // chirp_workflow::compat::workflow( - // ctx, - // crate::workflows::drain_all::Input { - // nomad_node_id: ctx.nomad_node_id.clone(), - // drain_timeout: ctx.drain_timeout, - // }, - // ) - // .await? - // .dispatch() - // .await?; + chirp_workflow::compat::workflow( + ctx, + crate::workflows::drain_all::Input2 { + nomad_node_id: ctx.nomad_node_id.clone(), + drain_timeout: ctx.drain_timeout, + }, + ) + .await? + .dispatch() + .await?; Ok(()) } diff --git a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs index 887fd1d4c4..32afe734c6 100644 --- a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs @@ -331,11 +331,7 @@ async fn update_db( ) .await { - tracing::warn!( - ?err, - ?alloc_id, - "error while trying to manually kill job" - ); + tracing::warn!(?err, ?alloc_id, "error while trying to manually kill job"); } } } diff --git a/svc/pkg/job-run/src/workflows/drain_all.rs b/svc/pkg/job-run/src/workflows/drain_all.rs index a480af5631..bed1713d9f 100644 --- a/svc/pkg/job-run/src/workflows/drain_all.rs +++ b/svc/pkg/job-run/src/workflows/drain_all.rs @@ -44,6 +44,33 @@ pub async fn job_run_drain_all(ctx: &mut WorkflowCtx, input: &Input) -> GlobalRe Ok(()) } +#[derive(Debug, Serialize, Deserialize)] +pub struct Input2 { + pub nomad_node_id: String, + pub drain_timeout: u64, +} + +#[workflow(Workflow2)] +pub async fn job_run_drain_all2(ctx: &mut WorkflowCtx, input: &Input2) -> GlobalResult<()> { + // We fetch here so that when we kill allocs later, we don't refetch new job runs that might be on the + // nomad node. Only allocs fetched at this time will be killed. + let job_runs = ctx + .activity(FetchJobRunsInput { + nomad_node_id: input.nomad_node_id.clone(), + }) + .await?; + + ctx.sleep(input.drain_timeout.saturating_sub(DRAIN_PADDING)) + .await?; + + ctx.activity(StopJobRuns2Input { + run_ids: job_runs.iter().map(|jr| jr.run_id).collect(), + }) + .await?; + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize, Hash)] struct FetchJobRunsInput { nomad_node_id: String, @@ -95,6 +122,24 @@ async fn stop_job_runs(ctx: &ActivityCtx, input: &StopJobRunsInput) -> GlobalRes Ok(()) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct StopJobRuns2Input { + run_ids: Vec, +} + +#[activity(StopJobRuns2)] +async fn stop_job_runs2(ctx: &ActivityCtx, input: &StopJobRuns2Input) -> GlobalResult<()> { + for run_id in &input.run_ids { + msg!([ctx] job_run::msg::stop(run_id) { + run_id: Some((*run_id).into()), + skip_kill_alloc: false, + }) + .await?; + } + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize, Hash)] struct KillAllocsInput { nomad_node_id: String, diff --git a/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs b/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs index 23899e3b76..61be96b27f 100644 --- a/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs +++ b/svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs @@ -66,34 +66,9 @@ async fn worker( } pipe.query_async(&mut ctx.redis_mm().await?).await?; - } else { - let mut script = REDIS_SCRIPT.prepare_invoke(); - - script.arg(lobby_rows.len()); - - for lobby in lobby_rows { - script - .key(util_mm::key::lobby_config(lobby.lobby_id)) - .key(util_mm::key::lobby_player_ids(lobby.lobby_id)) - .key(util_mm::key::lobby_available_spots( - lobby.namespace_id, - datacenter_id, - lobby.lobby_group_id, - util_mm::JoinKind::Normal, - )) - .key(util_mm::key::lobby_available_spots( - lobby.namespace_id, - datacenter_id, - lobby.lobby_group_id, - util_mm::JoinKind::Party, - )) - .arg(lobby.lobby_id.to_string()) - .arg(lobby.max_players_normal) - .arg(lobby.max_players_party); - } - - script.invoke_async(&mut ctx.redis_mm().await?).await?; } + // NOTE: Don't do anything on undrain + Ok(()) }