diff --git a/site/src/github.rs b/site/src/github.rs index b17eebf52..a2a6e2dad 100644 --- a/site/src/github.rs +++ b/site/src/github.rs @@ -2,7 +2,7 @@ pub mod client; pub mod comparison_summary; use crate::api::github::Commit; -use crate::job_queue::run_new_queue; +use crate::job_queue::should_use_job_queue; use crate::load::{MissingReason, SiteCtxt, TryCommit}; use chrono::{DateTime, Utc}; use serde::Deserialize; @@ -240,18 +240,16 @@ async fn attach_shas_to_try_benchmark_request( commit: &TryCommit, commit_date: DateTime, ) { - if run_new_queue() { - if let Err(e) = conn - .attach_shas_to_try_benchmark_request( - pr_number, - &commit.sha, - &commit.parent_sha, - commit_date, - ) - .await - { - log::error!("Failed to add shas to try commit: {e:?}"); - } + if let Err(e) = conn + .attach_shas_to_try_benchmark_request( + pr_number, + &commit.sha, + &commit.parent_sha, + commit_date, + ) + .await + { + log::error!("Failed to add shas to try commit: {e:?}"); } } @@ -281,22 +279,24 @@ pub async fn enqueue_shas( }; let conn = ctxt.conn().await; - attach_shas_to_try_benchmark_request( - &*conn, - pr_number, - &try_commit, - commit_response.commit.committer.date, - ) - .await; - - let queued = conn - .pr_attach_commit( + let queued = if should_use_job_queue(pr_number) { + attach_shas_to_try_benchmark_request( + &*conn, + pr_number, + &try_commit, + commit_response.commit.committer.date, + ) + .await; + true + } else { + conn.pr_attach_commit( pr_number, &try_commit.sha, &try_commit.parent_sha, Some(commit_response.commit.committer.date), ) - .await; + .await + }; if queued { if !msg.is_empty() { msg.push('\n'); diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index c63bfbe79..29e12e631 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -13,11 +13,18 @@ use parking_lot::RwLock; use std::{str::FromStr, sync::Arc}; use tokio::time::{self, Duration, MissedTickBehavior}; -pub fn run_new_queue() -> bool { - std::env::var("RUN_CRON") +pub fn is_job_queue_enabled() -> bool { + std::env::var("USE_JOB_QUEUE") .ok() .and_then(|x| x.parse().ok()) - .unwrap_or(false) + .unwrap_or(true) +} + +/// rust-lang/rust PR that will be used for testing the job queue. +const TEST_PR_FOR_JOB_QUEUE: u32 = 147039; + +pub fn should_use_job_queue(pr: u32) -> bool { + is_job_queue_enabled() && pr == TEST_PR_FOR_JOB_QUEUE } /// Store the latest master commits or do nothing if all of them are @@ -25,7 +32,7 @@ pub fn run_new_queue() -> bool { /// Returns `true` if at least one benchmark request was inserted. async fn create_benchmark_request_master_commits( ctxt: &SiteCtxt, - conn: &dyn database::pool::Connection, + _conn: &dyn database::pool::Connection, index: &BenchmarkRequestIndex, ) -> anyhow::Result { let now = Utc::now(); @@ -40,23 +47,26 @@ async fn create_benchmark_request_master_commits( // TODO; delete at some point in the future let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?; - let mut inserted = false; + let inserted = false; for master_commit in master_commits { // We don't want to add masses of obsolete data if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) { - let pr = master_commit.pr.unwrap_or(0); - let benchmark = BenchmarkRequest::create_master( - &master_commit.sha, - &master_commit.parent_sha, - pr, - master_commit.time, - ); - log::info!("Inserting master benchmark request {benchmark:?}"); - if let Err(error) = conn.insert_benchmark_request(&benchmark).await { - log::error!("Failed to insert master benchmark request: {error:?}"); - } else { - inserted = true; - } + // let pr = master_commit.pr.unwrap_or(0); + // let benchmark = BenchmarkRequest::create_master( + // &master_commit.sha, + // &master_commit.parent_sha, + // pr, + // master_commit.time, + // ); + // log::info!("Inserting master benchmark request {benchmark:?}"); + + // Do not create benchmark requests on production, to allow running in parallel with + // the old system. + // if let Err(error) = conn.insert_benchmark_request(&benchmark).await { + // log::error!("Failed to insert master benchmark request: {error:?}"); + // } else { + // inserted = true; + // } } } Ok(inserted) @@ -66,7 +76,7 @@ async fn create_benchmark_request_master_commits( /// already in the database /// Returns `true` if at least one benchmark request was inserted. async fn create_benchmark_request_releases( - conn: &dyn database::pool::Connection, + _conn: &dyn database::pool::Connection, index: &BenchmarkRequestIndex, ) -> anyhow::Result { let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt") @@ -82,16 +92,19 @@ async fn create_benchmark_request_releases( .filter_map(parse_release_string) .take(20); - let mut inserted = false; + let inserted = false; for (name, commit_date) in releases { if commit_date >= cutoff && !index.contains_tag(&name) { - let release_request = BenchmarkRequest::create_release(&name, commit_date); - log::info!("Inserting release benchmark request {release_request:?}"); - if let Err(error) = conn.insert_benchmark_request(&release_request).await { - log::error!("Failed to insert release benchmark request: {error}"); - } else { - inserted = true; - } + // let release_request = BenchmarkRequest::create_release(&name, commit_date); + // log::info!("Inserting release benchmark request {release_request:?}"); + + // Do not create benchmark requests on production, to allow running in parallel with + // the old system. + // if let Err(error) = conn.insert_benchmark_request(&release_request).await { + // log::error!("Failed to insert release benchmark request: {error}"); + // } else { + // inserted = true; + // } } } Ok(inserted) @@ -204,20 +217,20 @@ pub async fn build_queue( /// This is performed atomically, in a transaction. pub async fn enqueue_benchmark_request( conn: &mut dyn database::pool::Connection, - benchmark_request: &BenchmarkRequest, + request: &BenchmarkRequest, ) -> anyhow::Result<()> { let mut tx = conn.transaction().await; - let Some(request_tag) = benchmark_request.tag() else { - panic!("Benchmark request {benchmark_request:?} has no tag"); + let Some(request_tag) = request.tag() else { + panic!("Benchmark request {request:?} has no tag"); }; - log::info!("Enqueuing jobs for request {benchmark_request:?}"); + log::info!("Enqueuing jobs for request {request:?}"); - let backends = benchmark_request.backends()?; - let profiles = benchmark_request.profiles()?; + let backends = request.backends()?; + let profiles = request.profiles()?; // Prevent the error from spamming the logs - let mut has_emitted_parent_sha_error = false; + // let mut has_emitted_parent_sha_error = false; // Target x benchmark_set x backend x profile -> BenchmarkJob for target in Target::all() { @@ -238,32 +251,36 @@ pub async fn enqueue_benchmark_request( // If the parent job has been deleted from the database // but was already benchmarked then the collector will ignore // it as it will see it already has results. - if let Some(parent_sha) = benchmark_request.parent_sha() { - let (is_foreign_key_violation, result) = tx - .conn() - .enqueue_parent_benchmark_job( - parent_sha, - target, - backend, - profile, - benchmark_set as u32, - ) - .await; - - // At some point in time the parent_sha may not refer - // to a `benchmark_request` and we want to be able to - // see that error. - if let Err(e) = result { - if is_foreign_key_violation && !has_emitted_parent_sha_error { - log::error!("Failed to create job for parent sha {e:?}"); - has_emitted_parent_sha_error = true; - } else if has_emitted_parent_sha_error && is_foreign_key_violation { - continue; - } else { - return Err(e); - } - } - } + + // Do not enqueue parent jobs to allow parallel execution with the old system + // If the parent artifact wouldn't be benchmarked yet, we would benchmark the + // parent with the new system. + // if let Some(parent_sha) = request.parent_sha() { + // let (is_foreign_key_violation, result) = tx + // .conn() + // .enqueue_parent_benchmark_job( + // parent_sha, + // target, + // backend, + // profile, + // benchmark_set as u32, + // ) + // .await; + // + // // At some point in time the parent_sha may not refer + // // to a `benchmark_request` and we want to be able to + // // see that error. + // if let Err(e) = result { + // if is_foreign_key_violation && !has_emitted_parent_sha_error { + // log::error!("Failed to create job for parent sha {e:?}"); + // has_emitted_parent_sha_error = true; + // } else if has_emitted_parent_sha_error && is_foreign_key_violation { + // continue; + // } else { + // return Err(e); + // } + // } + // } } } } @@ -287,12 +304,15 @@ async fn process_benchmark_requests( ) -> anyhow::Result> { let queue = build_queue(conn).await?; + log::debug!("Current queue: {queue:?}"); + let mut completed = vec![]; for request in queue { match request.status() { BenchmarkRequestStatus::InProgress => { let tag = request.tag().expect("In progress request without a tag"); if conn.maybe_mark_benchmark_request_as_completed(tag).await? { + log::info!("Request {tag} marked as completed"); completed.push(request); continue; } @@ -311,8 +331,9 @@ async fn process_benchmark_requests( Ok(completed) } -/// For queueing jobs, add the jobs you want to queue to this function -async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> { +/// Creates new benchmark requests, enqueues jobs for ready benchmark requests and +/// finishes completed benchmark requests. +async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> { let mut conn = ctxt.conn().await; let index = ctxt.known_benchmark_requests.load(); @@ -387,7 +408,10 @@ async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> { } /// Entry point for the cron job that manages the benchmark request and job queue. -pub async fn cron_main(site_ctxt: Arc>>>, run_interval: Duration) { +pub async fn create_queue_process( + site_ctxt: Arc>>>, + run_interval: Duration, +) { let mut interval = time::interval(run_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -398,7 +422,7 @@ pub async fn cron_main(site_ctxt: Arc>>>, run_interv let guard = ctxt.read(); guard.as_ref().cloned() } { - match cron_enqueue_jobs(&ctxt_clone).await { + match perform_queue_tick(&ctxt_clone).await { Ok(_) => log::info!("Cron job finished"), Err(e) => log::error!("Cron job failed to execute: {e:?}"), } diff --git a/site/src/main.rs b/site/src/main.rs index c74792242..7affbd7de 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -1,6 +1,6 @@ use futures::future::FutureExt; use parking_lot::RwLock; -use site::job_queue::{cron_main, run_new_queue}; +use site::job_queue::{create_queue_process, is_job_queue_enabled}; use site::load; use std::env; use std::sync::Arc; @@ -59,9 +59,9 @@ async fn main() { let server = site::server::start(ctxt.clone(), port).fuse(); - if run_new_queue() { + if is_job_queue_enabled() { task::spawn(async move { - cron_main( + create_queue_process( ctxt.clone(), Duration::from_secs(queue_update_interval_seconds), ) diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index 9fc313a13..1275136de 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -3,7 +3,7 @@ use crate::github::{ client, enqueue_shas, parse_homu_comment, rollup_pr_number, unroll_rollup, COMMENT_MARK_TEMPORARY, RUST_REPO_GITHUB_API_URL, }; -use crate::job_queue::run_new_queue; +use crate::job_queue::should_use_job_queue; use crate::load::SiteCtxt; use database::BenchmarkRequest; @@ -84,13 +84,10 @@ async fn record_try_benchmark_request_without_artifacts( pr: u32, backends: &str, ) { - // We only want to run this if the new system is running - if run_new_queue() { - let try_request = BenchmarkRequest::create_try_without_artifacts(pr, backends, ""); - log::info!("Inserting try benchmark request {try_request:?}"); - if let Err(e) = conn.insert_benchmark_request(&try_request).await { - log::error!("Failed to insert try benchmark request: {}", e); - } + let try_request = BenchmarkRequest::create_try_without_artifacts(pr, backends, ""); + log::info!("Inserting try benchmark request {try_request:?}"); + if let Err(e) = conn.insert_benchmark_request(&try_request).await { + log::error!("Failed to insert try benchmark request: {}", e); } } @@ -120,20 +117,23 @@ async fn handle_rust_timer( Ok(cmd) => { let conn = ctxt.conn().await; - record_try_benchmark_request_without_artifacts( - &*conn, - issue.number, - cmd.params.backends.unwrap_or(""), - ) - .await; - conn.queue_pr( - issue.number, - cmd.params.include, - cmd.params.exclude, - cmd.params.runs, - cmd.params.backends, - ) - .await; + if should_use_job_queue(issue.number) { + record_try_benchmark_request_without_artifacts( + &*conn, + issue.number, + cmd.params.backends.unwrap_or(""), + ) + .await; + } else { + conn.queue_pr( + issue.number, + cmd.params.include, + cmd.params.exclude, + cmd.params.runs, + cmd.params.backends, + ) + .await; + } format!( "Awaiting bors try build completion. @@ -166,20 +166,23 @@ async fn handle_rust_timer( { let conn = ctxt.conn().await; for command in &valid_build_cmds { - record_try_benchmark_request_without_artifacts( - &*conn, - issue.number, - command.params.backends.unwrap_or(""), - ) - .await; - conn.queue_pr( - issue.number, - command.params.include, - command.params.exclude, - command.params.runs, - command.params.backends, - ) - .await; + if should_use_job_queue(issue.number) { + record_try_benchmark_request_without_artifacts( + &*conn, + issue.number, + command.params.backends.unwrap_or(""), + ) + .await; + } else { + conn.queue_pr( + issue.number, + command.params.include, + command.params.exclude, + command.params.runs, + command.params.backends, + ) + .await; + } } }