From 669e7814ee4e5f6ed00586c4f4931eb53452ef68 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 28 May 2024 20:57:07 +0000 Subject: [PATCH] fix(job-run): remove unneded retry_bail! --- .../src/workers/nomad_node_registered.rs | 2 +- svc/pkg/job-run/worker/src/workers/cleanup.rs | 16 ++++++++------ svc/pkg/job-run/worker/src/workers/stop.rs | 6 +++++- svc/pkg/mm/standalone/gc/src/lib.rs | 21 ++++++++++++------- .../mm/worker/src/workers/lobby_cleanup.rs | 6 +++++- svc/pkg/mm/worker/src/workers/lobby_stop.rs | 6 +++++- .../mm/worker/src/workers/player_remove.rs | 6 +++++- 7 files changed, 44 insertions(+), 19 deletions(-) diff --git a/svc/pkg/cluster/worker/src/workers/nomad_node_registered.rs b/svc/pkg/cluster/worker/src/workers/nomad_node_registered.rs index 9372320de2..bd09e974bd 100644 --- a/svc/pkg/cluster/worker/src/workers/nomad_node_registered.rs +++ b/svc/pkg/cluster/worker/src/workers/nomad_node_registered.rs @@ -38,7 +38,7 @@ async fn worker( // Insert metrics if let Some(install_complete_ts) = install_complete_ts { - insert_metrics(ctx, datacenter_id, nomad_join_ts, install_complete_ts).await?; + insert_metrics(ctx, datacenter_id, nomad_join_ts, install_complete_ts).await?; } else { tracing::warn!("missing install_complete_ts for nomad-node-registered"); } diff --git a/svc/pkg/job-run/worker/src/workers/cleanup.rs b/svc/pkg/job-run/worker/src/workers/cleanup.rs index 0124568567..ffa5ddcbd4 100644 --- a/svc/pkg/job-run/worker/src/workers/cleanup.rs +++ b/svc/pkg/job-run/worker/src/workers/cleanup.rs @@ -27,12 +27,16 @@ async fn worker(ctx: &OperationContext) -> Globa }) .await? else { - if ctx.req_dt() > util::duration::minutes(5) { - tracing::error!("discarding stale message"); - return Ok(()); - } else { - retry_bail!("run not found, may be race condition with insertion"); - } + // if ctx.req_dt() > util::duration::minutes(5) { + // tracing::error!("discarding stale message"); + // return Ok(()); + // } else { + // retry_bail!("run not found, may be race condition with insertion"); + // } + + // TODO: This has amplifying failures, so we just fail once here + tracing::error!("job run not found, may have leaked"); + return Ok(()); }; tracing::info!("removing from cache"); diff --git a/svc/pkg/job-run/worker/src/workers/stop.rs b/svc/pkg/job-run/worker/src/workers/stop.rs index 49bce08590..7d4cca5726 100644 --- a/svc/pkg/job-run/worker/src/workers/stop.rs +++ b/svc/pkg/job-run/worker/src/workers/stop.rs @@ -48,7 +48,11 @@ async fn worker(ctx: &OperationContext) -> GlobalRe tracing::error!("discarding stale message"); return Ok(()); } else { - retry_bail!("run not found, may be race condition with insertion"); + // retry_bail!("run not found, may be race condition with insertion"); + + // TODO: This has amplifying failures, so we just fail once here + tracing::error!("job run not found, may have leaked"); + return Ok(()); } }; diff --git a/svc/pkg/mm/standalone/gc/src/lib.rs b/svc/pkg/mm/standalone/gc/src/lib.rs index cd728135a4..b9b8539d7f 100644 --- a/svc/pkg/mm/standalone/gc/src/lib.rs +++ b/svc/pkg/mm/standalone/gc/src/lib.rs @@ -1,6 +1,7 @@ use proto::backend::pkg::*; use redis::AsyncCommands; use rivet_operation::prelude::*; +use rivet_pools::prelude::redis::Commands; #[tracing::instrument(skip_all)] pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()> { @@ -86,11 +87,13 @@ async fn cull_unregistered_players( mut redis_mm: RedisPool, client: chirp_client::Client, ) -> GlobalResult<()> { - // We don't remove from the set here since this will be removed in the mm-player-remove - // service. - let remove_player_ids = redis_mm - .zrangebyscore::<_, _, _, Vec>(util_mm::key::player_unregistered(), 0, ts as isize) + let remove_player_ids = redis::pipe() + .zrangebyscore(util_mm::key::player_unregistered(), 0, ts as isize) + .zrembyscore(util_mm::key::player_unregistered(), 0, ts as isize) + .ignore() + .query_async::<_, (Vec,)>(&mut redis_mm) .await? + .0 .into_iter() .filter_map(|x| util::uuid::parse(&x).ok()) .collect::>(); @@ -115,11 +118,13 @@ async fn cull_auto_remove_players( mut redis_mm: RedisPool, client: chirp_client::Client, ) -> GlobalResult<()> { - // We don't remove from the set here since this will be removed in the mm-player-remove - // service. - let remove_player_ids = redis_mm - .zrangebyscore::<_, _, _, Vec>(util_mm::key::player_auto_remove(), 0, ts as isize) + let remove_player_ids = redis::pipe() + .zrangebyscore(util_mm::key::player_auto_remove(), 0, ts as isize) + .zrembyscore(util_mm::key::player_auto_remove(), 0, ts as isize) + .ignore() + .query_async::<_, (Vec,)>(&mut redis_mm) .await? + .0 .into_iter() .filter_map(|x| util::uuid::parse(&x).ok()) .collect::>(); diff --git a/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs b/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs index 480ccae902..a623472dda 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs @@ -95,7 +95,11 @@ async fn worker(ctx: &OperationContext) -> Glob tracing::error!("discarding stale message"); return Ok(()); } else { - retry_bail!("lobby not found, may be race condition with insertion"); + // retry_bail!("lobby not found, may be race condition with insertion"); + + // TODO: This has amplifying failures, so we just fail once here + tracing::error!("lobby not found, may have leaked"); + return Ok(()); } }; diff --git a/svc/pkg/mm/worker/src/workers/lobby_stop.rs b/svc/pkg/mm/worker/src/workers/lobby_stop.rs index a45e5cf6c3..ebd1a0f913 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_stop.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_stop.rs @@ -43,7 +43,11 @@ async fn worker(ctx: &OperationContext) -> GlobalR tracing::error!("discarding stale message"); return Ok(()); } else { - retry_bail!("lobby not found, may be race condition with insertion"); + // retry_bail!("lobby not found, may be race condition with insertion"); + + // TODO: This has amplifying failures, so we just fail once here + tracing::error!("lobby not found, may have leaked"); + return Ok(()); } }; diff --git a/svc/pkg/mm/worker/src/workers/player_remove.rs b/svc/pkg/mm/worker/src/workers/player_remove.rs index 1fa4dc4081..cc8d16a7b3 100644 --- a/svc/pkg/mm/worker/src/workers/player_remove.rs +++ b/svc/pkg/mm/worker/src/workers/player_remove.rs @@ -83,7 +83,11 @@ async fn worker(ctx: &OperationContext) -> Glob tracing::error!("discarding stale message"); return Ok(()); } else { - retry_bail!("player not found, may be race condition with insertion"); + // retry_bail!("player not found, may be race condition with insertion"); + + // TODO: This has amplifying failures, so we just fail once here + tracing::error!("player not found, may have leaked"); + return Ok(()); }; // Validate lobby