Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions svc/pkg/job/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,53 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
FROM db_job_state.runs
INNER JOIN db_job_state.run_meta_nomad ON run_meta_nomad.run_id = runs.run_id
AS OF SYSTEM TIME '-5s'
WHERE stop_ts IS NULL AND start_ts < $1
WHERE stop_ts IS NULL AND create_ts < $1
",
check_orphaned_ts,
);
let mut total_potentially_orphaned = 0;
let mut orphaned = 0;
let mut has_stop_ts = 0;
let mut no_dispatched_job_id = 0;
let mut still_running = 0;
while let Some(res) = runs_iter.next().await {
let (run_id, stop_ts, dispatched_job_id) = res?;
total_potentially_orphaned += 1;

if stop_ts.is_some() {
has_stop_ts += 1;
continue;
}

tracing::info!(%run_id, "checking for orphaned runs");

let dispatched_job_id = if let Some(x) = dispatched_job_id {
x
} else {
tracing::warn!(%run_id, "dispatched job id not found");
no_dispatched_job_id += 1;
continue;
};

if !job_ids_from_nomad.contains(&dispatched_job_id) {
orphaned += 1;
tracing::warn!(%run_id, "stopping orphaned run");
msg!([ctx] @wait job_run::msg::stop(run_id) {
run_id: Some(run_id.into()),
..Default::default()
})
.await?;
} else {
still_running += 1;
}
}

tracing::info!("finished");
tracing::info!(
?total_potentially_orphaned,
?orphaned,
?has_stop_ts,
?no_dispatched_job_id,
?still_running,
"finished"
);

Ok(())
}