Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion svc/pkg/mm/ops/lobby-find-fail/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ async fn handle(
futs.push(fail_query(ctx.clone(), redis, query_id, ctx.error_code).boxed());
}
}
futures_util::future::try_join_all(futs).await?;
if !futs.is_empty() {
futures_util::future::try_join_all(futs).await?;
}

Ok(mm::lobby_find_fail::Response {})
}
Expand Down
9 changes: 8 additions & 1 deletion svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use proto::backend::pkg::*;
use redis::AsyncCommands;
use rivet_operation::prelude::*;

const MAX_COUNT: isize = 16;

#[operation(name = "mm-lobby-find-lobby-query-list")]
async fn handle(
ctx: OperationContext<mm::lobby_find_lobby_query_list::Request>,
Expand All @@ -11,7 +13,7 @@ async fn handle(
let query_ids = ctx
.redis_mm()
.await?
.zrange::<_, Vec<String>>(util_mm::key::lobby_find_queries(lobby_id), 0, -1)
.zrange::<_, Vec<String>>(util_mm::key::lobby_find_queries(lobby_id), 0, MAX_COUNT - 1)
.await?
.iter()
.map(String::as_str)
Expand All @@ -20,5 +22,10 @@ async fn handle(
.map(common::Uuid::from)
.collect::<Vec<common::Uuid>>();

if query_ids.len() as isize == MAX_COUNT {
tracing::warn!("too many find queries, short circuiting to prevent bad things from happening");
return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() })
}

Ok(mm::lobby_find_lobby_query_list::Response { query_ids })
}
17 changes: 17 additions & 0 deletions svc/pkg/mm/worker/src/workers/lobby_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ async fn worker(ctx: &OperationContext<mm::msg::lobby_cleanup::Message>) -> Glob
// This is idempotent, don't raise error
tracing::info!("lobby not present in redis");

remove_from_redis_without_config(&mut redis_mm, lobby_id).await?;

false
};

Expand Down Expand Up @@ -256,3 +258,18 @@ async fn remove_from_redis(

Ok(())
}

async fn remove_from_redis_without_config(
redis_mm: &mut RedisPool,
lobby_id: Uuid,
) -> GlobalResult<()> {
let mut pipe = redis::pipe();
pipe.atomic()
.unlink(util_mm::key::lobby_config(lobby_id))
.unlink(util_mm::key::lobby_tags(lobby_id))
.zrem(util_mm::key::lobby_unready(), lobby_id.to_string())
.query_async(redis_mm)
.await?;

Ok(())
}
25 changes: 15 additions & 10 deletions svc/pkg/mm/worker/src/workers/lobby_stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ struct LobbyRow {
async fn worker(ctx: &OperationContext<mm::msg::lobby_stop::Message>) -> GlobalResult<()> {
let lobby_id = unwrap_ref!(ctx.lobby_id).as_uuid();

if ctx.req_dt() > util::duration::minutes(5) {
tracing::error!("discarding stale message");
return Ok(());
}

// Fetch the lobby.
//
// This also ensures that mm-lobby-find or mm-lobby-create
Expand Down Expand Up @@ -38,24 +43,24 @@ async fn worker(ctx: &OperationContext<mm::msg::lobby_stop::Message>) -> GlobalR
.await?;
tracing::info!(?lobby_row, "lobby row");

let Some(lobby_row) = lobby_row else {
if ctx.req_dt() > util::duration::minutes(5) {
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("lobby not found, may be race condition with insertion");
}
};

// conflicting locks on the lobby row
// Cleanup the lobby ASAP.
//
// Conflicting locks on the lobby row, so dont cleanup after the SQL query but before the retry_bail in
// case the lobby does not exist in the db. lobby_cleanup will remove it from Redis
// appropriately.
//
// This will also be called in `job-run-cleanup`, but this is idempotent.
msg!([ctx] mm::msg::lobby_cleanup(lobby_id) {
lobby_id: Some(lobby_id.into()),
})
.await?;

let Some(lobby_row) = lobby_row else {
// Don't use `retry_bail` because this will retry frequently, and we need to call
// `mm::msg::lobby_cleanup` first
bail!("lobby not found, may be race condition with insertion");
};

// Stop the job. This will call cleanup and delete the lobby row.
if let Some(run_id) = lobby_row.run_id {
msg!([ctx] job_run::msg::stop(run_id) {
Expand Down