From a6ce505e8c6eedb0a319e571335dee98348cfab0 Mon Sep 17 00:00:00 2001 From: NathanFlurry Date: Thu, 4 Apr 2024 20:06:53 +0000 Subject: [PATCH] chore(job): gc orphaned jobs from mm (#627) ## Changes --- svc/Cargo.lock | 13 +++ svc/Cargo.toml | 1 + svc/pkg/job/standalone/gc/Cargo.toml | 3 + svc/pkg/job/standalone/gc/src/lib.rs | 106 +++++++++++++----- svc/pkg/mm/ops/lobby-for-run-id/Cargo.toml | 19 ++++ svc/pkg/mm/ops/lobby-for-run-id/Service.toml | 7 ++ svc/pkg/mm/ops/lobby-for-run-id/src/lib.rs | 44 ++++++++ .../ops/lobby-for-run-id/tests/integration.rs | 21 ++++ svc/pkg/mm/types/lobby-for-run-id.proto | 18 +++ 9 files changed, 204 insertions(+), 28 deletions(-) create mode 100644 svc/pkg/mm/ops/lobby-for-run-id/Cargo.toml create mode 100644 svc/pkg/mm/ops/lobby-for-run-id/Service.toml create mode 100644 svc/pkg/mm/ops/lobby-for-run-id/src/lib.rs create mode 100644 svc/pkg/mm/ops/lobby-for-run-id/tests/integration.rs create mode 100644 svc/pkg/mm/types/lobby-for-run-id.proto diff --git a/svc/Cargo.lock b/svc/Cargo.lock index c7ec0337bc..281f3e69d7 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -4270,6 +4270,8 @@ dependencies = [ "indoc", "job-run-get", "lazy_static", + "mm-lobby-for-run-id", + "mm-lobby-get", "nomad-client", "nomad-util", "prost 0.10.4", @@ -5060,6 +5062,17 @@ dependencies = [ "rivet-util-mm", ] +[[package]] +name = "mm-lobby-for-run-id" +version = "0.0.1" +dependencies = [ + "chirp-client", + "chirp-worker", + "faker-mm-lobby", + "rivet-operation", + "sqlx", +] + [[package]] name = "mm-lobby-get" version = "0.0.1" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index 9555aae974..cc5674b226 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -152,6 +152,7 @@ members = [ "pkg/mm/ops/lobby-find-fail", "pkg/mm/ops/lobby-find-lobby-query-list", "pkg/mm/ops/lobby-find-try-complete", + "pkg/mm/ops/lobby-for-run-id", "pkg/mm/ops/lobby-get", "pkg/mm/ops/lobby-history", "pkg/mm/ops/lobby-idle-update", diff --git a/svc/pkg/job/standalone/gc/Cargo.toml b/svc/pkg/job/standalone/gc/Cargo.toml index 63a9d96b3a..3bf4b4170b 100644 --- a/svc/pkg/job/standalone/gc/Cargo.toml +++ b/svc/pkg/job/standalone/gc/Cargo.toml @@ -30,6 +30,9 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ ] } uuid = { version = "1", features = ["serde"] } +mm-lobby-for-run-id = { path = "../../../mm/ops/lobby-for-run-id" } +mm-lobby-get = { path = "../../../mm/ops/lobby-get" } + [dependencies.sqlx] version = "0.7" default-features = false diff --git a/svc/pkg/job/standalone/gc/src/lib.rs b/svc/pkg/job/standalone/gc/src/lib.rs index 3c7dac2104..f3cf545e46 100644 --- a/svc/pkg/job/standalone/gc/src/lib.rs +++ b/svc/pkg/job/standalone/gc/src/lib.rs @@ -1,6 +1,19 @@ +// This service catches two edge cases: +// +// # Case A: Nomad outage +// +// In the situation that nomad-monitor fails to receive a Nomad event (i.e. +// node migration, the job failed, or Nomad failed), there will be jobs +// where the Nomad job did not dispatch a stop event, causing the job to be +// orphaned. +// +// # Case B: Matchmaker inconsistency +// +// If the matchmaker lobbies are stopped but fail to stop the Nomad job, this will detect that and +// stop the stop automatically. + use std::collections::HashSet; -use futures_util::stream::StreamExt; use indoc::indoc; use proto::backend::pkg::*; use rivet_operation::prelude::*; @@ -32,11 +45,6 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<() Vec::new(), ); - // In the situation that nomad-monitor fails to receive a Nomad event (i.e. - // node migration, the job failed, or Nomad failed), there will be jobs - // where the Nomad job did not dispatch a stop event, causing the job to be - // orphaned. - // Find jobs to stop. let job_stubs = nomad_client::apis::jobs_api::get_jobs(&NOMAD_CONFIG, None, None, None, None, Some("job-")) @@ -62,31 +70,30 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<() // // We use stale reads without locks since job-run-stop is idempotent. let crdb = ctx.crdb().await?; - let mut runs_iter = sql_fetch!( - [ctx, (Uuid, Option, Option), &crdb] + let runs = sql_fetch_all!( + [ctx, (Uuid, Option), &crdb] " - SELECT runs.run_id, runs.stop_ts, run_meta_nomad.dispatched_job_id + SELECT runs.run_id, run_meta_nomad.dispatched_job_id FROM db_job_state.runs INNER JOIN db_job_state.run_meta_nomad ON run_meta_nomad.run_id = runs.run_id AS OF SYSTEM TIME '-5s' WHERE stop_ts IS NULL AND create_ts < $1 ", check_orphaned_ts, - ); - let mut total_potentially_orphaned = 0; - let mut orphaned = 0; - let mut has_stop_ts = 0; - let mut no_dispatched_job_id = 0; - let mut still_running = 0; - while let Some(res) = runs_iter.next().await { - let (run_id, stop_ts, dispatched_job_id) = res?; - total_potentially_orphaned += 1; + ) + .await?; - if stop_ts.is_some() { - has_stop_ts += 1; - continue; - } + // List of all run IDs that are still running by the time the gc finishes + let mut running_run_ids = runs.iter().map(|x| x.0).collect::>(); + + let total_potentially_orphaned = runs.len(); + let mut orphaned_nomad_job = 0; + let mut orphaned_mm_lobby = 0; + let mut oprhaned_mm_lobby_not_found = 0; + let mut no_dispatched_job_id = 0; + // Check for orphaned Nomad jobs + for (run_id, dispatched_job_id) in runs { let dispatched_job_id = if let Some(x) = dispatched_job_id { x } else { @@ -96,24 +103,67 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<() }; if !job_ids_from_nomad.contains(&dispatched_job_id) { - orphaned += 1; - tracing::warn!(%run_id, "stopping orphaned run"); + orphaned_nomad_job += 1; + running_run_ids.remove(&run_id); + + tracing::warn!(%run_id, "stopping orphaned run from nomad job"); msg!([ctx] @wait job_run::msg::stop(run_id) { run_id: Some(run_id.into()), ..Default::default() }) .await?; + } + } + + // Check for matchmaker orphans + let run_lobbies = op!([ctx] mm_lobby_for_run_id { + run_ids: running_run_ids.iter().cloned().map(Into::into).collect(), + }) + .await?; + let lobbies = op!([ctx] mm_lobby_get { + lobby_ids: run_lobbies.lobbies.iter().flat_map(|x| x.lobby_id).collect(), + include_stopped: true, + }) + .await?; + for run_id in running_run_ids.clone() { + let lobby = lobbies + .lobbies + .iter() + .find(|x| x.run_id == Some(run_id.into())); + + if let Some(lobby) = lobby { + if lobby.stop_ts.is_some() { + orphaned_mm_lobby += 1; + running_run_ids.remove(&run_id); + + tracing::warn!(%run_id, "stopping orphaned run from matchmaker lobby stopped"); + msg!([ctx] @wait job_run::msg::stop(run_id) { + run_id: Some(run_id.into()), + ..Default::default() + }) + .await?; + } } else { - still_running += 1; + oprhaned_mm_lobby_not_found += 1; + running_run_ids.remove(&run_id); + + // HACK: This is only true in production. This will have false positives with tests. + tracing::warn!(%run_id, "stopping orphaned run from matchmaker lobby not found"); + msg!([ctx] @wait job_run::msg::stop(run_id) { + run_id: Some(run_id.into()), + ..Default::default() + }) + .await?; } } tracing::info!( ?total_potentially_orphaned, - ?orphaned, - ?has_stop_ts, + ?orphaned_nomad_job, + ?orphaned_mm_lobby, + ?oprhaned_mm_lobby_not_found, ?no_dispatched_job_id, - ?still_running, + still_running = ?running_run_ids.len(), "finished" ); diff --git a/svc/pkg/mm/ops/lobby-for-run-id/Cargo.toml b/svc/pkg/mm/ops/lobby-for-run-id/Cargo.toml new file mode 100644 index 0000000000..5cbc4355f6 --- /dev/null +++ b/svc/pkg/mm/ops/lobby-for-run-id/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "mm-lobby-for-run-id" +version = "0.0.1" +edition = "2021" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-client = { path = "../../../../../lib/chirp/client" } +rivet-operation = { path = "../../../../../lib/operation/core" } + +[dependencies.sqlx] +version = "0.7" +default-features = false + +[dev-dependencies] +chirp-worker = { path = "../../../../../lib/chirp/worker" } +faker-mm-lobby = { path = "../../../faker/ops/mm-lobby" } + diff --git a/svc/pkg/mm/ops/lobby-for-run-id/Service.toml b/svc/pkg/mm/ops/lobby-for-run-id/Service.toml new file mode 100644 index 0000000000..f1b08f4503 --- /dev/null +++ b/svc/pkg/mm/ops/lobby-for-run-id/Service.toml @@ -0,0 +1,7 @@ +[service] +name = "mm-lobby-for-run-id" + +[runtime] +kind = "rust" + +[operation] diff --git a/svc/pkg/mm/ops/lobby-for-run-id/src/lib.rs b/svc/pkg/mm/ops/lobby-for-run-id/src/lib.rs new file mode 100644 index 0000000000..a993eb226c --- /dev/null +++ b/svc/pkg/mm/ops/lobby-for-run-id/src/lib.rs @@ -0,0 +1,44 @@ +use proto::backend::{self, pkg::*}; +use rivet_operation::prelude::*; + +#[derive(sqlx::FromRow)] +struct LobbyRow { + lobby_id: Uuid, + run_id: Option, +} + +impl From for mm::lobby_for_run_id::response::Lobby { + fn from(value: LobbyRow) -> Self { + Self { + lobby_id: Some(value.lobby_id.into()), + run_id: value.run_id.map(Into::into), + } + } +} + +#[operation(name = "mm-lobby-for-run-id")] +pub async fn handle( + ctx: OperationContext, +) -> GlobalResult { + let run_ids = ctx + .run_ids + .iter() + .map(common::Uuid::as_uuid) + .collect::>(); + + let lobbies = sql_fetch_all!( + [ctx, LobbyRow] + " + SELECT lobby_id, run_id + FROM db_mm_state.lobbies + WHERE run_id = ANY($1) + ", + run_ids, + ) + .await? + .into_iter() + .map(Into::::into) + .collect(); + + Ok(mm::lobby_for_run_id::Response { lobbies }) +} diff --git a/svc/pkg/mm/ops/lobby-for-run-id/tests/integration.rs b/svc/pkg/mm/ops/lobby-for-run-id/tests/integration.rs new file mode 100644 index 0000000000..bb4695a968 --- /dev/null +++ b/svc/pkg/mm/ops/lobby-for-run-id/tests/integration.rs @@ -0,0 +1,21 @@ +use chirp_worker::prelude::*; + +#[worker_test] +async fn basic(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let lobby_res = op!([ctx] faker_mm_lobby { + ..Default::default() + }) + .await + .unwrap(); + + let res = op!([ctx] mm_lobby_for_run_id { + run_ids: vec![lobby_res.lobby_id.unwrap(), Uuid::new_v4().into()], + }) + .await + .unwrap(); + assert_eq!(1, res.lobbies.len()); +} diff --git a/svc/pkg/mm/types/lobby-for-run-id.proto b/svc/pkg/mm/types/lobby-for-run-id.proto new file mode 100644 index 0000000000..f4c48d0fc1 --- /dev/null +++ b/svc/pkg/mm/types/lobby-for-run-id.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package rivet.backend.pkg.mm.lobby_for_run_id; + +import "proto/common.proto"; + +message Request { + repeated rivet.common.Uuid run_ids = 1; +} + +message Response { + message Lobby { + rivet.common.Uuid lobby_id = 1; + rivet.common.Uuid run_id = 2; + } + + repeated Lobby lobbies = 1; +}