diff --git a/svc/pkg/mm/ops/lobby-find-fail/src/lib.rs b/svc/pkg/mm/ops/lobby-find-fail/src/lib.rs index 882903f245..4c657f0b09 100644 --- a/svc/pkg/mm/ops/lobby-find-fail/src/lib.rs +++ b/svc/pkg/mm/ops/lobby-find-fail/src/lib.rs @@ -30,7 +30,9 @@ async fn handle( futs.push(fail_query(ctx.clone(), redis, query_id, ctx.error_code).boxed()); } } - futures_util::future::try_join_all(futs).await?; + if !futs.is_empty() { + futures_util::future::try_join_all(futs).await?; + } Ok(mm::lobby_find_fail::Response {}) } diff --git a/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs b/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs index 3819ba3fa6..429f76a3ce 100644 --- a/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs +++ b/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs @@ -2,6 +2,8 @@ use proto::backend::pkg::*; use redis::AsyncCommands; use rivet_operation::prelude::*; +const MAX_COUNT: isize = 16; + #[operation(name = "mm-lobby-find-lobby-query-list")] async fn handle( ctx: OperationContext, @@ -11,7 +13,7 @@ async fn handle( let query_ids = ctx .redis_mm() .await? - .zrange::<_, Vec>(util_mm::key::lobby_find_queries(lobby_id), 0, -1) + .zrange::<_, Vec>(util_mm::key::lobby_find_queries(lobby_id), 0, MAX_COUNT - 1) .await? .iter() .map(String::as_str) @@ -20,5 +22,10 @@ async fn handle( .map(common::Uuid::from) .collect::>(); + if query_ids.len() as isize == MAX_COUNT { + tracing::warn!("too many find queries, short circuiting to prevent bad things from happening"); + return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() }) + } + Ok(mm::lobby_find_lobby_query_list::Response { query_ids }) } diff --git a/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs b/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs index 480ccae902..1a3e9e0e6c 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_cleanup.rs @@ -57,6 +57,8 @@ async fn worker(ctx: &OperationContext) -> Glob // This is idempotent, don't raise error tracing::info!("lobby not present in redis"); + remove_from_redis_without_config(&mut redis_mm, lobby_id).await?; + false }; @@ -256,3 +258,18 @@ async fn remove_from_redis( Ok(()) } + +async fn remove_from_redis_without_config( + redis_mm: &mut RedisPool, + lobby_id: Uuid, +) -> GlobalResult<()> { + let mut pipe = redis::pipe(); + pipe.atomic() + .unlink(util_mm::key::lobby_config(lobby_id)) + .unlink(util_mm::key::lobby_tags(lobby_id)) + .zrem(util_mm::key::lobby_unready(), lobby_id.to_string()) + .query_async(redis_mm) + .await?; + + Ok(()) +} diff --git a/svc/pkg/mm/worker/src/workers/lobby_stop.rs b/svc/pkg/mm/worker/src/workers/lobby_stop.rs index a45e5cf6c3..845e20c19e 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_stop.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_stop.rs @@ -11,6 +11,11 @@ struct LobbyRow { async fn worker(ctx: &OperationContext) -> GlobalResult<()> { let lobby_id = unwrap_ref!(ctx.lobby_id).as_uuid(); + if ctx.req_dt() > util::duration::minutes(5) { + tracing::error!("discarding stale message"); + return Ok(()); + } + // Fetch the lobby. // // This also ensures that mm-lobby-find or mm-lobby-create @@ -38,24 +43,24 @@ async fn worker(ctx: &OperationContext) -> GlobalR .await?; tracing::info!(?lobby_row, "lobby row"); - let Some(lobby_row) = lobby_row else { - if ctx.req_dt() > util::duration::minutes(5) { - tracing::error!("discarding stale message"); - return Ok(()); - } else { - retry_bail!("lobby not found, may be race condition with insertion"); - } - }; - - // conflicting locks on the lobby row // Cleanup the lobby ASAP. // + // Conflicting locks on the lobby row, so dont cleanup after the SQL query but before the retry_bail in + // case the lobby does not exist in the db. lobby_cleanup will remove it from Redis + // appropriately. + // // This will also be called in `job-run-cleanup`, but this is idempotent. msg!([ctx] mm::msg::lobby_cleanup(lobby_id) { lobby_id: Some(lobby_id.into()), }) .await?; + let Some(lobby_row) = lobby_row else { + // Don't use `retry_bail` because this will retry frequently, and we need to call + // `mm::msg::lobby_cleanup` first + bail!("lobby not found, may be race condition with insertion"); + }; + // Stop the job. This will call cleanup and delete the lobby row. if let Some(run_id) = lobby_row.run_id { msg!([ctx] job_run::msg::stop(run_id) {