Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn worker(

// Insert metrics
if let Some(install_complete_ts) = install_complete_ts {
insert_metrics(ctx, datacenter_id, nomad_join_ts, install_complete_ts).await?;
insert_metrics(ctx, datacenter_id, nomad_join_ts, install_complete_ts).await?;
} else {
tracing::warn!("missing install_complete_ts for nomad-node-registered");
}
Expand Down
16 changes: 10 additions & 6 deletions svc/pkg/job-run/worker/src/workers/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ async fn worker(ctx: &OperationContext<job_run::msg::cleanup::Message>) -> Globa
})
.await?
else {
if ctx.req_dt() > util::duration::minutes(5) {
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("run not found, may be race condition with insertion");
}
// if ctx.req_dt() > util::duration::minutes(5) {
// tracing::error!("discarding stale message");
// return Ok(());
// } else {
// retry_bail!("run not found, may be race condition with insertion");
// }

// TODO: This has amplifying failures, so we just fail once here
tracing::error!("job run not found, may have leaked");
return Ok(());
};

tracing::info!("removing from cache");
Expand Down
6 changes: 5 additions & 1 deletion svc/pkg/job-run/worker/src/workers/stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ async fn worker(ctx: &OperationContext<job_run::msg::stop::Message>) -> GlobalRe
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("run not found, may be race condition with insertion");
// retry_bail!("run not found, may be race condition with insertion");

// TODO: This has amplifying failures, so we just fail once here
tracing::error!("job run not found, may have leaked");
return Ok(());
}
};

Expand Down
21 changes: 13 additions & 8 deletions svc/pkg/mm/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use proto::backend::pkg::*;
use redis::AsyncCommands;
use rivet_operation::prelude::*;
use rivet_pools::prelude::redis::Commands;

#[tracing::instrument(skip_all)]
pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()> {
Expand Down Expand Up @@ -86,11 +87,13 @@ async fn cull_unregistered_players(
mut redis_mm: RedisPool,
client: chirp_client::Client,
) -> GlobalResult<()> {
// We don't remove from the set here since this will be removed in the mm-player-remove
// service.
let remove_player_ids = redis_mm
.zrangebyscore::<_, _, _, Vec<String>>(util_mm::key::player_unregistered(), 0, ts as isize)
let remove_player_ids = redis::pipe()
.zrangebyscore(util_mm::key::player_unregistered(), 0, ts as isize)
.zrembyscore(util_mm::key::player_unregistered(), 0, ts as isize)
.ignore()
.query_async::<_, (Vec<String>,)>(&mut redis_mm)
.await?
.0
.into_iter()
.filter_map(|x| util::uuid::parse(&x).ok())
.collect::<Vec<_>>();
Expand All @@ -115,11 +118,13 @@ async fn cull_auto_remove_players(
mut redis_mm: RedisPool,
client: chirp_client::Client,
) -> GlobalResult<()> {
// We don't remove from the set here since this will be removed in the mm-player-remove
// service.
let remove_player_ids = redis_mm
.zrangebyscore::<_, _, _, Vec<String>>(util_mm::key::player_auto_remove(), 0, ts as isize)
let remove_player_ids = redis::pipe()
.zrangebyscore(util_mm::key::player_auto_remove(), 0, ts as isize)
.zrembyscore(util_mm::key::player_auto_remove(), 0, ts as isize)
.ignore()
.query_async::<_, (Vec<String>,)>(&mut redis_mm)
.await?
.0
.into_iter()
.filter_map(|x| util::uuid::parse(&x).ok())
.collect::<Vec<_>>();
Expand Down
6 changes: 5 additions & 1 deletion svc/pkg/mm/worker/src/workers/lobby_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ async fn worker(ctx: &OperationContext<mm::msg::lobby_cleanup::Message>) -> Glob
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("lobby not found, may be race condition with insertion");
// retry_bail!("lobby not found, may be race condition with insertion");

// TODO: This has amplifying failures, so we just fail once here
tracing::error!("lobby not found, may have leaked");
return Ok(());
}
};

Expand Down
6 changes: 5 additions & 1 deletion svc/pkg/mm/worker/src/workers/lobby_stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ async fn worker(ctx: &OperationContext<mm::msg::lobby_stop::Message>) -> GlobalR
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("lobby not found, may be race condition with insertion");
// retry_bail!("lobby not found, may be race condition with insertion");

// TODO: This has amplifying failures, so we just fail once here
tracing::error!("lobby not found, may have leaked");
return Ok(());
}
};

Expand Down
6 changes: 5 additions & 1 deletion svc/pkg/mm/worker/src/workers/player_remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ async fn worker(ctx: &OperationContext<mm::msg::player_remove::Message>) -> Glob
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("player not found, may be race condition with insertion");
// retry_bail!("player not found, may be race condition with insertion");

// TODO: This has amplifying failures, so we just fail once here
tracing::error!("player not found, may have leaked");
return Ok(());
};

// Validate lobby
Expand Down