From b3f38b1e324cc2d889c439ec65bcba3f4bdb9558 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Oct 2025 15:21:33 +0200 Subject: [PATCH 1/3] Keep looping when a job is not found in the job queue --- collector/src/bin/collector.rs | 171 ++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 76 deletions(-) diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index c908d5468..53747b86e 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -1493,6 +1493,9 @@ fn get_host_tuple_from_rustc(rustc: &str) -> anyhow::Result { /// Maximum number of failures before a job will be marked as failed. const MAX_JOB_FAILS: u32 = 3; +/// How long should the collector sleep for if it does not find any job in the job queue. +const JOB_WAIT_SLEEP_TIME: Duration = Duration::from_secs(30); + async fn run_job_queue_benchmarks( pool: Pool, mut conn: Box, @@ -1504,96 +1507,112 @@ async fn run_job_queue_benchmarks( let mut last_request_tag = None; - while let Some((benchmark_job, artifact_id)) = conn - .dequeue_benchmark_job( - collector.name(), - collector.target(), - collector.benchmark_set(), - ) - .await? - { - // Are we benchmarking a different benchmark request than in the previous iteration of the - // loop? - let is_new_request = last_request_tag.is_some() - && last_request_tag.as_deref() != Some(benchmark_job.request_tag()); - if is_new_request { - let _ = tidy_toolchain_cache_dir(); - } - - // Here we check if we should update our commit SHA, if rustc-perf has been updated. - // We only check for updates when we switch *benchmark requests*, not *benchmark jobs*, - // to avoid changing code in the middle of benchmarking the same request. - // Note that if an update happens, the job that we have just dequeued will have its deque - // counter increased. But since updates are relatively rare, that shouldn't be a big deal, - // it will be dequeued again when the collector starts again. - if check_git_sha && is_new_request && needs_git_update(collector) { + // Outer loop - wait for some jobs to appear + loop { + if check_git_sha && needs_git_update(collector) { log::warn!("Exiting collector to update itself from git."); - return Ok(()); + break; } - last_request_tag = Some(benchmark_job.request_tag().to_string()); + while let Some((benchmark_job, artifact_id)) = conn + .dequeue_benchmark_job( + collector.name(), + collector.target(), + collector.benchmark_set(), + ) + .await? + { + // Are we benchmarking a different benchmark request than in the previous iteration of the + // loop? + let is_new_request = last_request_tag.is_some() + && last_request_tag.as_deref() != Some(benchmark_job.request_tag()); + if is_new_request { + let _ = tidy_toolchain_cache_dir(); + } - log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}"); - let result = run_benchmark_job( - conn.as_mut(), - &benchmark_job, - artifact_id.clone(), - &all_compile_benchmarks, - ) - .await; - match result { - Ok(_) => { - log::info!("Job finished sucessfully"); - conn.mark_benchmark_job_as_completed( - benchmark_job.id(), - BenchmarkJobConclusion::Success, - ) - .await?; + // Here we check if we should update our commit SHA, if rustc-perf has been updated. + // We only check for updates when we switch *benchmark requests*, not *benchmark jobs*, + // to avoid changing code in the middle of benchmarking the same request. + // Note that if an update happens, the job that we have just dequeued will have its deque + // counter increased. But since updates are relatively rare, that shouldn't be a big deal, + // it will be dequeued again when the collector starts again. + if check_git_sha && is_new_request && needs_git_update(collector) { + log::warn!("Exiting collector to update itself from git."); + return Ok(()); } - Err(error) => { - match error { - BenchmarkJobError::Permanent(error) => { - log::error!("Job finished with permanent error: {error:?}"); - - // Store the error to the database - let artifact_row_id = conn.artifact_id(&artifact_id).await; - // Use a placeholder to say that the error is associated with a job, - // not with a benchmark. - conn.record_error( - artifact_row_id, - "Job failure", - &format!("Error while benchmarking job {benchmark_job:?}: {error:?}"), - Some(benchmark_job.id()), - ) - .await; - - // Something bad that probably cannot be retried has happened. - // Immediately mark the job as failed and continue with other jobs - log::info!("Marking the job as failed"); - conn.mark_benchmark_job_as_completed( - benchmark_job.id(), - BenchmarkJobConclusion::Failure, - ) - .await?; - } - BenchmarkJobError::Transient(error) => { - log::error!("Job finished with transient error: {error:?}"); - // There was some transient (i.e. I/O, network or database) error. - // Let's retry the job later, with some sleep - log::info!("Retrying after 30s..."); - tokio::time::sleep(Duration::from_secs(30)).await; + last_request_tag = Some(benchmark_job.request_tag().to_string()); - // Maybe there was a DB issue. Try to reconnect to the database. - conn = pool.connection().await; + log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}"); + let result = run_benchmark_job( + conn.as_mut(), + &benchmark_job, + artifact_id.clone(), + &all_compile_benchmarks, + ) + .await; + match result { + Ok(_) => { + log::info!("Job finished sucessfully"); + conn.mark_benchmark_job_as_completed( + benchmark_job.id(), + BenchmarkJobConclusion::Success, + ) + .await?; + } + Err(error) => { + match error { + BenchmarkJobError::Permanent(error) => { + log::error!("Job finished with permanent error: {error:?}"); + + // Store the error to the database + let artifact_row_id = conn.artifact_id(&artifact_id).await; + // Use a placeholder to say that the error is associated with a job, + // not with a benchmark. + conn.record_error( + artifact_row_id, + "Job failure", + &format!( + "Error while benchmarking job {benchmark_job:?}: {error:?}" + ), + Some(benchmark_job.id()), + ) + .await; + + // Something bad that probably cannot be retried has happened. + // Immediately mark the job as failed and continue with other jobs + log::info!("Marking the job as failed"); + conn.mark_benchmark_job_as_completed( + benchmark_job.id(), + BenchmarkJobConclusion::Failure, + ) + .await?; + } + BenchmarkJobError::Transient(error) => { + log::error!("Job finished with transient error: {error:?}"); + + // There was some transient (i.e. I/O, network or database) error. + // Let's retry the job later, with some sleep + log::info!("Retrying after 30s..."); + tokio::time::sleep(Duration::from_secs(30)).await; + + // Maybe there was a DB issue. Try to reconnect to the database. + conn = pool.connection().await; + } } } } + + conn.update_collector_heartbeat(collector.name()).await?; } + log::info!( + "No job found, sleeping for {}s", + JOB_WAIT_SLEEP_TIME.as_secs() + ); + tokio::time::sleep(JOB_WAIT_SLEEP_TIME).await; conn.update_collector_heartbeat(collector.name()).await?; } - log::info!("No job found, exiting"); Ok(()) } From eab312735e641fb993a35bd3fc66e84db470cb62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Oct 2025 15:24:24 +0200 Subject: [PATCH 2/3] Log when a new request is being started --- collector/src/bin/collector.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index 53747b86e..055bb6d72 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -1524,9 +1524,10 @@ async fn run_job_queue_benchmarks( { // Are we benchmarking a different benchmark request than in the previous iteration of the // loop? - let is_new_request = last_request_tag.is_some() - && last_request_tag.as_deref() != Some(benchmark_job.request_tag()); + let is_new_request = last_request_tag.is_none() + || (last_request_tag.as_deref() != Some(benchmark_job.request_tag())); if is_new_request { + log::info!("Starting new request {}", benchmark_job.request_tag()); let _ = tidy_toolchain_cache_dir(); } From ae6095861f8bac16b7a7f017b3d89b90f60e1c83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Oct 2025 15:25:51 +0200 Subject: [PATCH 3/3] Add bash script to run the new job queue collector --- collector/collect-job-queue.sh | 37 ++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100755 collector/collect-job-queue.sh diff --git a/collector/collect-job-queue.sh b/collector/collect-job-queue.sh new file mode 100755 index 000000000..70514966d --- /dev/null +++ b/collector/collect-job-queue.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# This script expects DATABASE and COLLECTOR_NAME to be defined in the environment + +set -u -o pipefail + +echo "Running job queue collector" + +export RUST_LOG=collector=trace,collector::sysroot=debug +export PATH="/home/collector/.cargo/bin:$PATH" + +while : ; do + # Update and rebuild the collector. + git pull + git reset --hard @{upstream} + + # Make sure we have a recent build, so that we can successfully build + # the collector. + rustup update stable + cargo +stable build --release -p collector + + CURRENT_SHA=`git rev-parse HEAD` + + target/release/collector benchmark_job_queue \ + --db "${DATABASE}" \ + --check_git_sha \ + --git_sha "${CURRENT_SHA}" + --collector_name "${COLLECTOR_NAME}" + + STATUS=$? + echo finished run at `date` with exit code $STATUS + + # Wait a bit if the command has failed. + if [ $STATUS -ne 0 ]; then + sleep 60 + fi +done