Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions svc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ members = [
"pkg/mm/ops/lobby-find-fail",
"pkg/mm/ops/lobby-find-lobby-query-list",
"pkg/mm/ops/lobby-find-try-complete",
"pkg/mm/ops/lobby-for-run-id",
"pkg/mm/ops/lobby-get",
"pkg/mm/ops/lobby-history",
"pkg/mm/ops/lobby-idle-update",
Expand Down
3 changes: 3 additions & 0 deletions svc/pkg/job/standalone/gc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
] }
uuid = { version = "1", features = ["serde"] }

mm-lobby-for-run-id = { path = "../../../mm/ops/lobby-for-run-id" }
mm-lobby-get = { path = "../../../mm/ops/lobby-get" }

[dependencies.sqlx]
version = "0.7"
default-features = false
Expand Down
106 changes: 78 additions & 28 deletions svc/pkg/job/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
// This service catches two edge cases:
//
// # Case A: Nomad outage
//
// In the situation that nomad-monitor fails to receive a Nomad event (i.e.
// node migration, the job failed, or Nomad failed), there will be jobs
// where the Nomad job did not dispatch a stop event, causing the job to be
// orphaned.
//
// # Case B: Matchmaker inconsistency
//
// If the matchmaker lobbies are stopped but fail to stop the Nomad job, this will detect that and
// stop the stop automatically.

use std::collections::HashSet;

use futures_util::stream::StreamExt;
use indoc::indoc;
use proto::backend::pkg::*;
use rivet_operation::prelude::*;
Expand Down Expand Up @@ -32,11 +45,6 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
Vec::new(),
);

// In the situation that nomad-monitor fails to receive a Nomad event (i.e.
// node migration, the job failed, or Nomad failed), there will be jobs
// where the Nomad job did not dispatch a stop event, causing the job to be
// orphaned.

// Find jobs to stop.
let job_stubs =
nomad_client::apis::jobs_api::get_jobs(&NOMAD_CONFIG, None, None, None, None, Some("job-"))
Expand All @@ -62,31 +70,30 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
//
// We use stale reads without locks since job-run-stop is idempotent.
let crdb = ctx.crdb().await?;
let mut runs_iter = sql_fetch!(
[ctx, (Uuid, Option<i64>, Option<String>), &crdb]
let runs = sql_fetch_all!(
[ctx, (Uuid, Option<String>), &crdb]
"
SELECT runs.run_id, runs.stop_ts, run_meta_nomad.dispatched_job_id
SELECT runs.run_id, run_meta_nomad.dispatched_job_id
FROM db_job_state.runs
INNER JOIN db_job_state.run_meta_nomad ON run_meta_nomad.run_id = runs.run_id
AS OF SYSTEM TIME '-5s'
WHERE stop_ts IS NULL AND create_ts < $1
",
check_orphaned_ts,
);
let mut total_potentially_orphaned = 0;
let mut orphaned = 0;
let mut has_stop_ts = 0;
let mut no_dispatched_job_id = 0;
let mut still_running = 0;
while let Some(res) = runs_iter.next().await {
let (run_id, stop_ts, dispatched_job_id) = res?;
total_potentially_orphaned += 1;
)
.await?;

if stop_ts.is_some() {
has_stop_ts += 1;
continue;
}
// List of all run IDs that are still running by the time the gc finishes
let mut running_run_ids = runs.iter().map(|x| x.0).collect::<HashSet<Uuid>>();

let total_potentially_orphaned = runs.len();
let mut orphaned_nomad_job = 0;
let mut orphaned_mm_lobby = 0;
let mut oprhaned_mm_lobby_not_found = 0;
let mut no_dispatched_job_id = 0;

// Check for orphaned Nomad jobs
for (run_id, dispatched_job_id) in runs {
let dispatched_job_id = if let Some(x) = dispatched_job_id {
x
} else {
Expand All @@ -96,24 +103,67 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
};

if !job_ids_from_nomad.contains(&dispatched_job_id) {
orphaned += 1;
tracing::warn!(%run_id, "stopping orphaned run");
orphaned_nomad_job += 1;
running_run_ids.remove(&run_id);

tracing::warn!(%run_id, "stopping orphaned run from nomad job");
msg!([ctx] @wait job_run::msg::stop(run_id) {
run_id: Some(run_id.into()),
..Default::default()
})
.await?;
}
}

// Check for matchmaker orphans
let run_lobbies = op!([ctx] mm_lobby_for_run_id {
run_ids: running_run_ids.iter().cloned().map(Into::into).collect(),
})
.await?;
let lobbies = op!([ctx] mm_lobby_get {
lobby_ids: run_lobbies.lobbies.iter().flat_map(|x| x.lobby_id).collect(),
include_stopped: true,
})
.await?;
for run_id in running_run_ids.clone() {
let lobby = lobbies
.lobbies
.iter()
.find(|x| x.run_id == Some(run_id.into()));

if let Some(lobby) = lobby {
if lobby.stop_ts.is_some() {
orphaned_mm_lobby += 1;
running_run_ids.remove(&run_id);

tracing::warn!(%run_id, "stopping orphaned run from matchmaker lobby stopped");
msg!([ctx] @wait job_run::msg::stop(run_id) {
run_id: Some(run_id.into()),
..Default::default()
})
.await?;
}
} else {
still_running += 1;
oprhaned_mm_lobby_not_found += 1;
running_run_ids.remove(&run_id);

// HACK: This is only true in production. This will have false positives with tests.
tracing::warn!(%run_id, "stopping orphaned run from matchmaker lobby not found");
msg!([ctx] @wait job_run::msg::stop(run_id) {
run_id: Some(run_id.into()),
..Default::default()
})
.await?;
}
}

tracing::info!(
?total_potentially_orphaned,
?orphaned,
?has_stop_ts,
?orphaned_nomad_job,
?orphaned_mm_lobby,
?oprhaned_mm_lobby_not_found,
?no_dispatched_job_id,
?still_running,
still_running = ?running_run_ids.len(),
"finished"
);

Expand Down
19 changes: 19 additions & 0 deletions svc/pkg/mm/ops/lobby-for-run-id/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "mm-lobby-for-run-id"
version = "0.0.1"
edition = "2021"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
chirp-client = { path = "../../../../../lib/chirp/client" }
rivet-operation = { path = "../../../../../lib/operation/core" }

[dependencies.sqlx]
version = "0.7"
default-features = false

[dev-dependencies]
chirp-worker = { path = "../../../../../lib/chirp/worker" }
faker-mm-lobby = { path = "../../../faker/ops/mm-lobby" }

7 changes: 7 additions & 0 deletions svc/pkg/mm/ops/lobby-for-run-id/Service.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[service]
name = "mm-lobby-for-run-id"

[runtime]
kind = "rust"

[operation]
44 changes: 44 additions & 0 deletions svc/pkg/mm/ops/lobby-for-run-id/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use proto::backend::{self, pkg::*};
use rivet_operation::prelude::*;

#[derive(sqlx::FromRow)]
struct LobbyRow {
lobby_id: Uuid,
run_id: Option<Uuid>,
}

impl From<LobbyRow> for mm::lobby_for_run_id::response::Lobby {
fn from(value: LobbyRow) -> Self {
Self {
lobby_id: Some(value.lobby_id.into()),
run_id: value.run_id.map(Into::into),
}
}
}

#[operation(name = "mm-lobby-for-run-id")]
pub async fn handle(
ctx: OperationContext<mm::lobby_for_run_id::Request>,
) -> GlobalResult<mm::lobby_for_run_id::Response> {
let run_ids = ctx
.run_ids
.iter()
.map(common::Uuid::as_uuid)
.collect::<Vec<_>>();

let lobbies = sql_fetch_all!(
[ctx, LobbyRow]
"
SELECT lobby_id, run_id
FROM db_mm_state.lobbies
WHERE run_id = ANY($1)
",
run_ids,
)
.await?
.into_iter()
.map(Into::<mm::lobby_for_run_id::response::Lobby>::into)
.collect();

Ok(mm::lobby_for_run_id::Response { lobbies })
}
21 changes: 21 additions & 0 deletions svc/pkg/mm/ops/lobby-for-run-id/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use chirp_worker::prelude::*;

#[worker_test]
async fn basic(ctx: TestCtx) {
if !util::feature::job_run() {
return;
}

let lobby_res = op!([ctx] faker_mm_lobby {
..Default::default()
})
.await
.unwrap();

let res = op!([ctx] mm_lobby_for_run_id {
run_ids: vec![lobby_res.lobby_id.unwrap(), Uuid::new_v4().into()],
})
.await
.unwrap();
assert_eq!(1, res.lobbies.len());
}
18 changes: 18 additions & 0 deletions svc/pkg/mm/types/lobby-for-run-id.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package rivet.backend.pkg.mm.lobby_for_run_id;

import "proto/common.proto";

message Request {
repeated rivet.common.Uuid run_ids = 1;
}

message Response {
message Lobby {
rivet.common.Uuid lobby_id = 1;
rivet.common.Uuid run_id = 2;
}

repeated Lobby lobbies = 1;
}