diff --git a/database/src/lib.rs b/database/src/lib.rs index 06ec0f5b7..5d0c142e1 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1041,12 +1041,9 @@ impl BenchmarkRequest { } /// Cached information about benchmark requests in the DB -/// FIXME: only store non-try requests here pub struct BenchmarkRequestIndex { /// Tags (SHA or release name) of all known benchmark requests all: HashSet, - /// Tags (SHA or release name) of all benchmark requests in the completed status - completed: HashSet, } impl BenchmarkRequestIndex { @@ -1054,16 +1051,13 @@ impl BenchmarkRequestIndex { pub fn contains_tag(&self, tag: &str) -> bool { self.all.contains(tag) } +} - /// Return tags of already completed benchmark requests. - pub fn completed_requests(&self) -> &HashSet { - &self.completed - } - - pub fn add_tag(&mut self, tag: &str) { - self.all.insert(tag.to_string()); - self.completed.insert(tag.to_string()); - } +/// Contains pending (ArtifactsReady or InProgress) benchmark requests, and a set of their parents +/// that are already completed. +pub struct PendingBenchmarkRequests { + pub requests: Vec, + pub completed_parent_tags: HashSet, } #[derive(Debug, Clone, PartialEq)] diff --git a/database/src/pool.rs b/database/src/pool.rs index 9ffc3b89a..7a6cac627 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -2,7 +2,8 @@ use crate::selector::CompileTestCase; use crate::{ ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestWithErrors, - BenchmarkSet, CodegenBackend, CollectorConfig, CompileBenchmark, Target, + BenchmarkSet, CodegenBackend, CollectorConfig, CompileBenchmark, PendingBenchmarkRequests, + Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -189,7 +190,8 @@ pub trait Connection: Send + Sync { async fn purge_artifact(&self, aid: &ArtifactId); /// Add an item to the `benchmark_requests`, if the `benchmark_request` - /// exists an Error will be returned + /// exists an Error will be returned. + /// We require the caller to pass an index, to ensure that it is always kept up-to-date. async fn insert_benchmark_request( &self, benchmark_request: &BenchmarkRequest, @@ -200,7 +202,9 @@ pub trait Connection: Send + Sync { /// Load all pending benchmark requests, i.e. those that have artifacts ready, but haven't /// been completed yet. Pending statuses are `ArtifactsReady` and `InProgress`. - async fn load_pending_benchmark_requests(&self) -> anyhow::Result>; + /// Also returns their parents, so that we can quickly check which requests are ready for being + /// enqueued. + async fn load_pending_benchmark_requests(&self) -> anyhow::Result; /// Update the status of a `benchmark_request` with the given `tag`. /// If no such request exists in the DB, returns an error. @@ -426,9 +430,9 @@ mod tests { use crate::tests::run_postgres_test; use crate::{tests::run_db_test, BenchmarkRequestType, Commit, CommitType, Date}; use chrono::Utc; + use std::collections::BTreeSet; use std::str::FromStr; - /// Create a Commit fn create_commit(commit_sha: &str, time: chrono::DateTime, r#type: CommitType) -> Commit { Commit { sha: commit_sha.into(), @@ -437,43 +441,6 @@ mod tests { } } - async fn complete_request( - db: &dyn Connection, - request_tag: &str, - collector_name: &str, - benchmark_set: u32, - target: Target, - ) { - /* Create job for the request */ - db.enqueue_benchmark_job( - request_tag, - target, - CodegenBackend::Llvm, - Profile::Opt, - benchmark_set, - ) - .await - .unwrap(); - - let (job, _) = db - .dequeue_benchmark_job(collector_name, target, BenchmarkSet(benchmark_set)) - .await - .unwrap() - .unwrap(); - - assert_eq!(job.request_tag(), request_tag); - - /* Mark the job as complete */ - db.mark_benchmark_job_as_completed(job.id(), BenchmarkJobConclusion::Success) - .await - .unwrap(); - - assert!(db - .maybe_mark_benchmark_request_as_completed(request_tag) - .await - .unwrap()); - } - #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -559,37 +526,25 @@ mod tests { async fn multiple_non_completed_try_requests() { run_postgres_test(|ctx| async { let db = ctx.db(); - let target = Target::X86_64UnknownLinuxGnu; - let collector_name = "collector-1"; - let benchmark_set = 1; - - db.add_collector_config(collector_name, target, benchmark_set, true) - .await - .unwrap(); - // Complete parent - let parent = BenchmarkRequest::create_release("sha-parent-1", Utc::now()); - // Complete - let req_a = BenchmarkRequest::create_try_without_artifacts(42, "", ""); - // WaitingForArtifacts - let req_b = BenchmarkRequest::create_try_without_artifacts(42, "", ""); - let req_c = BenchmarkRequest::create_try_without_artifacts(42, "", ""); - - db.insert_benchmark_request(&parent).await.unwrap(); - db.insert_benchmark_request(&req_a).await.unwrap(); - db.attach_shas_to_try_benchmark_request(42, "sha1", "sha-parent-1", Utc::now()) + // Insert a try build + ctx.insert_try_request(42).await; + db.attach_shas_to_try_benchmark_request(42, "sha-1", "sha-parent-1", Utc::now()) .await .unwrap(); - complete_request(db, "sha-parent-1", collector_name, benchmark_set, target).await; - complete_request(db, "sha1", collector_name, benchmark_set, target).await; + // Then finish it + ctx.complete_request("sha-1").await; - // This should be fine, req_a was completed - db.insert_benchmark_request(&req_b).await.unwrap(); - // This should fail, we can't have two queued requests at once - db.insert_benchmark_request(&req_c).await.expect_err( - "It was possible to record two try benchmark requests without artifacts", - ); + // Insert a try build for the same PR again + // This should be fine, because the previous request was already completed + ctx.insert_try_request(42).await; + // But this should fail, as we can't have two queued requests at once + db.insert_benchmark_request(&BenchmarkRequest::create_try_without_artifacts( + 42, "", "", + )) + .await + .expect_err("It was possible to record two try benchmark requests without artifacts"); Ok(ctx) }) @@ -629,43 +584,48 @@ mod tests { async fn load_pending_benchmark_requests() { run_postgres_test(|ctx| async { let db = ctx.db(); - let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); - let target = Target::X86_64UnknownLinuxGnu; - let collector_name = "collector-1"; - let benchmark_set = 1; - - db.add_collector_config(collector_name, target, benchmark_set, true) - .await - .unwrap(); // ArtifactsReady - let req_a = BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time); + let req_a = ctx.insert_master_request("sha-1", "parent-sha-1", 42).await; // ArtifactsReady - let req_b = BenchmarkRequest::create_release("1.80.0", time); + let req_b = ctx.insert_release_request("1.80.0").await; // WaitingForArtifacts - let req_c = BenchmarkRequest::create_try_without_artifacts(50, "", ""); + ctx.insert_try_request(50).await; // InProgress - let req_d = BenchmarkRequest::create_master("sha-2", "parent-sha-2", 51, time); + let req_d = ctx.insert_master_request("sha-2", "parent-sha-2", 51).await; // Completed - let req_e = BenchmarkRequest::create_release("1.79.0", time); + ctx.insert_release_request("1.79.0").await; - for &req in &[&req_a, &req_b, &req_c, &req_d, &req_e] { - db.insert_benchmark_request(req).await.unwrap(); - } - - complete_request(db, "1.79.0", collector_name, benchmark_set, target).await; + ctx.complete_request("1.79.0").await; + ctx.insert_master_request("parent-sha-1", "grandparent-sha-0", 100) + .await; + ctx.complete_request("parent-sha-1").await; + ctx.insert_master_request("parent-sha-2", "grandparent-sha-1", 101) + .await; + ctx.complete_request("parent-sha-2").await; db.update_benchmark_request_status("sha-2", BenchmarkRequestStatus::InProgress) .await .unwrap(); - let requests = db.load_pending_benchmark_requests().await.unwrap(); + let pending = db.load_pending_benchmark_requests().await.unwrap(); + let requests = pending.requests; assert_eq!(requests.len(), 3); for req in &[req_a, req_b, req_d] { assert!(requests.iter().any(|r| r.tag() == req.tag())); } + assert_eq!( + pending + .completed_parent_tags + .into_iter() + .collect::>() + .into_iter() + .collect::>(), + vec!["parent-sha-1".to_string(), "parent-sha-2".to_string()] + ); + Ok(ctx) }) .await; @@ -687,6 +647,7 @@ mod tests { .load_pending_benchmark_requests() .await .unwrap() + .requests .into_iter() .next() .unwrap(); diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 89eb640f3..8fa0988eb 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -5,9 +5,10 @@ use crate::{ BenchmarkJobConclusion, BenchmarkJobStatus, BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, BenchmarkRequestWithErrors, BenchmarkSet, CodegenBackend, CollectionId, CollectorConfig, Commit, CommitType, CompileBenchmark, Date, - Index, Profile, QueuedCommit, Scenario, Target, BENCHMARK_JOB_STATUS_FAILURE_STR, - BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, BENCHMARK_JOB_STATUS_QUEUED_STR, - BENCHMARK_JOB_STATUS_SUCCESS_STR, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, + Index, PendingBenchmarkRequests, Profile, QueuedCommit, Scenario, Target, + BENCHMARK_JOB_STATUS_FAILURE_STR, BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, + BENCHMARK_JOB_STATUS_QUEUED_STR, BENCHMARK_JOB_STATUS_SUCCESS_STR, + BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR, BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR, BENCHMARK_REQUEST_TRY_STR, @@ -500,6 +501,7 @@ pub struct CachedStatements { get_compile_test_cases_with_measurements: Statement, get_last_n_completed_requests_with_errors: Statement, get_jobs_of_in_progress_benchmark_requests: Statement, + load_pending_benchmark_requests: Statement, } pub struct PostgresTransaction<'a> { @@ -683,7 +685,7 @@ impl PostgresConnection { where aid = $1 ").await.unwrap(), load_benchmark_request_index: conn.prepare(" - SELECT tag, status + SELECT tag FROM benchmark_request WHERE tag IS NOT NULL ").await.unwrap(), @@ -751,6 +753,18 @@ impl PostgresConnection { -- Only get the jobs of in_progress requests SELECT * FROM job_queue INNER JOIN requests ON job_queue.request_tag = requests.tag ")).await.unwrap(), + // Load pending benchmark requests, along with information whether their parent is + // completed or not + load_pending_benchmark_requests: conn.prepare(&format!(" + WITH pending AS ( + SELECT {BENCHMARK_REQUEST_COLUMNS} + FROM benchmark_request AS req + WHERE status IN ('{BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR}', '{BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR}') + ) + SELECT (parent.status = '{BENCHMARK_REQUEST_STATUS_COMPLETED_STR}') AS parent_done, pending.* + FROM pending + LEFT JOIN benchmark_request as parent ON parent.tag = pending.parent_sha + ")).await.unwrap(), }), conn, } @@ -809,7 +823,7 @@ where None => Date(Utc.with_ymd_and_hms(2001, 1, 1, 0, 0, 0).unwrap()), } }, - r#type: CommitType::from_str(&row.get::<_, String>(3)).unwrap() + r#type: CommitType::from_str(&row.get::<_, String>(3)).unwrap(), }, ) }) @@ -1188,13 +1202,13 @@ where Some(aid) => aid.get::<_, i32>(0) as u32, None => { self.conn() - .query_opt("insert into artifact (name, date, type) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING id", &[ - &info.name, - &info.date, - &info.kind, - ]) - .await - .unwrap(); + .query_opt("insert into artifact (name, date, type) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING id", &[ + &info.name, + &info.date, + &info.kind, + ]) + .await + .unwrap(); self.conn() .query_one("select id from artifact where name = $1", &[&info.name]) .await @@ -1623,18 +1637,11 @@ where .await .context("Cannot load benchmark request index")?; - let mut all = HashSet::with_capacity(requests.len()); - let mut completed = HashSet::with_capacity(requests.len()); - for request in requests { - let tag = request.get::<_, String>(0); - let status = request.get::<_, &str>(1); - - if status == BENCHMARK_REQUEST_STATUS_COMPLETED_STR { - completed.insert(tag.clone()); - } - all.insert(tag); - } - Ok(BenchmarkRequestIndex { all, completed }) + let all = requests + .into_iter() + .map(|row| row.get::<_, String>(0)) + .collect(); + Ok(BenchmarkRequestIndex { all }) } async fn update_benchmark_request_status( @@ -1705,25 +1712,30 @@ where Ok(()) } - async fn load_pending_benchmark_requests(&self) -> anyhow::Result> { - let query = format!( - r#" - SELECT {BENCHMARK_REQUEST_COLUMNS} - FROM benchmark_request - WHERE status IN('{BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR}', '{BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR}')"# - ); - + async fn load_pending_benchmark_requests(&self) -> anyhow::Result { let rows = self .conn() - .query(&query, &[]) + .query(&self.statements().load_pending_benchmark_requests, &[]) .await .context("Failed to get pending benchmark requests")?; - let requests = rows - .into_iter() - .map(|it| row_to_benchmark_request(&it, None)) - .collect(); - Ok(requests) + let mut completed_parent_tags = HashSet::new(); + let mut requests = Vec::with_capacity(rows.len()); + for row in rows { + let parent_done = row.get::<_, Option>(0); + let request = row_to_benchmark_request(&row, Some(1)); + if let Some(true) = parent_done { + if let Some(parent) = request.parent_sha() { + completed_parent_tags.insert(parent.to_string()); + } + } + requests.push(request); + } + + Ok(PendingBenchmarkRequests { + requests, + completed_parent_tags, + }) } async fn enqueue_benchmark_job( diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index f36c77f52..74cb63111 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -4,7 +4,7 @@ use crate::{ ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkJobConclusion, BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestWithErrors, BenchmarkSet, CodegenBackend, CollectionId, CollectorConfig, Commit, CommitType, - CompileBenchmark, Date, Profile, Target, + CompileBenchmark, Date, PendingBenchmarkRequests, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -1306,7 +1306,7 @@ impl Connection for SqliteConnection { no_queue_implementation_abort!() } - async fn load_pending_benchmark_requests(&self) -> anyhow::Result> { + async fn load_pending_benchmark_requests(&self) -> anyhow::Result { no_queue_implementation_abort!() } diff --git a/database/src/tests/mod.rs b/database/src/tests/mod.rs index 6beacd32b..9d88a60ce 100644 --- a/database/src/tests/mod.rs +++ b/database/src/tests/mod.rs @@ -139,6 +139,20 @@ impl TestContext { req } + /// Create a new release benchmark request and add it to the DB. + pub async fn insert_release_request(&self, tag: &str) -> BenchmarkRequest { + let req = BenchmarkRequest::create_release(tag, Utc::now()); + self.db().insert_benchmark_request(&req).await.unwrap(); + req + } + + /// Create a new try benchmark request without artifacts and add it to the DB. + pub async fn insert_try_request(&self, pr: u32) -> BenchmarkRequest { + let req = BenchmarkRequest::create_try_without_artifacts(pr, "", ""); + self.db().insert_benchmark_request(&req).await.unwrap(); + req + } + pub async fn complete_request(&self, tag: &str) { // Note: this assumes that there are not non-completed jobs in the DB for the request self.db() diff --git a/site/src/github.rs b/site/src/github.rs index b7c807b21..b17eebf52 100644 --- a/site/src/github.rs +++ b/site/src/github.rs @@ -250,7 +250,7 @@ async fn attach_shas_to_try_benchmark_request( ) .await { - log::error!("Failed to add shas to try commit {}", e); + log::error!("Failed to add shas to try commit: {e:?}"); } } } diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 3969db96d..c33baa436 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -1,14 +1,15 @@ mod utils; -use std::{str::FromStr, sync::Arc}; - use crate::job_queue::utils::{parse_release_string, ExtractIf}; use crate::load::{partition_in_place, SiteCtxt}; use chrono::Utc; use collector::benchmark_set::benchmark_set_count; -use database::{BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, Target}; -use hashbrown::HashSet; +use database::{ + BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, PendingBenchmarkRequests, + Target, +}; use parking_lot::RwLock; +use std::{str::FromStr, sync::Arc}; use tokio::time::{self, Duration}; pub fn run_new_queue() -> bool { @@ -21,11 +22,19 @@ pub fn run_new_queue() -> bool { /// Store the latest master commits or do nothing if all of them are /// already in the database async fn create_benchmark_request_master_commits( - ctxt: &Arc, + ctxt: &SiteCtxt, conn: &dyn database::pool::Connection, index: &BenchmarkRequestIndex, ) -> anyhow::Result<()> { - let master_commits = &ctxt.get_master_commits().commits; + let now = Utc::now(); + + let master_commits = ctxt.get_master_commits(); + // Only consider the last ~month of master commits + let master_commits = master_commits + .commits + .iter() + .filter(|c| now.signed_duration_since(c.time) < chrono::Duration::days(29)); + // TODO; delete at some point in the future let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?; @@ -61,12 +70,11 @@ async fn create_benchmark_request_releases( // TODO; delete at some point in the future let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?; - let releases: Vec<_> = releases + let releases = releases .lines() .rev() .filter_map(parse_release_string) - .take(20) - .collect(); + .take(20); for (name, commit_date) in releases { if commit_date >= cutoff && !index.contains_tag(&name) { @@ -80,23 +88,26 @@ async fn create_benchmark_request_releases( Ok(()) } -/// Sorts try and master requests that are in the `ArtifactsReady` status. +/// Sorts try and master requests that are in the `ArtifactsReady` status and return them in the +/// correct queue order, where the first returned request will be the first to be benchmarked next. /// Doesn't consider in-progress requests or release artifacts. -fn sort_benchmark_requests(index: &BenchmarkRequestIndex, request_queue: &mut [BenchmarkRequest]) { - let mut done: HashSet = index.completed_requests().clone(); - - // Ensure all the items are ready to be sorted, if they are not this is - // undefined behaviour - assert!(request_queue.iter().all(|bmr| { +fn sort_benchmark_requests(pending: PendingBenchmarkRequests) -> Vec { + let PendingBenchmarkRequests { + requests: mut pending, + completed_parent_tags: mut done, + } = pending; + + // Ensure all the items are ready to be sorted + assert!(pending.iter().all(|bmr| { bmr.status() == BenchmarkRequestStatus::ArtifactsReady && (bmr.is_master() || bmr.is_try()) })); let mut finished = 0; - while finished < request_queue.len() { + while finished < pending.len() { // The next level is those elements in the unordered queue which // are ready to be benchmarked (i.e., those with parent in done or no // parent). - let level_len = partition_in_place(request_queue[finished..].iter_mut(), |bmr| { + let level_len = partition_in_place(pending[finished..].iter_mut(), |bmr| { bmr.parent_sha().is_none_or(|parent| done.contains(parent)) }); @@ -109,12 +120,12 @@ fn sort_benchmark_requests(index: &BenchmarkRequestIndex, request_queue: &mut [B panic!("No master/try commit is ready for benchmarking"); } else { log::warn!("No master/try commit is ready for benchmarking"); - return; + return pending; } } // Everything in level has the same topological order, then we sort based on heuristics - let level = &mut request_queue[finished..][..level_len]; + let level = &mut pending[finished..][..level_len]; level.sort_unstable_by_key(|bmr| { ( // PR number takes priority @@ -130,14 +141,15 @@ fn sort_benchmark_requests(index: &BenchmarkRequestIndex, request_queue: &mut [B // that all of the statuses of the benchmark requests are // `ArtifactsReady` it is implausable for this `expect(...)` to be // hit. - done.insert( - c.tag() - .expect("Tag should exist on a benchmark request being sorted") - .to_string(), - ); + let tag = c + .tag() + .expect("Tag should exist on a benchmark request being sorted") + .to_string(); + done.insert(tag); } finished += level_len; } + pending } /// Creates a benchmark request queue that determines in what order will @@ -148,13 +160,12 @@ fn sort_benchmark_requests(index: &BenchmarkRequestIndex, request_queue: &mut [B /// Does not consider requests that are waiting for artifacts or that are alredy completed. pub async fn build_queue( conn: &dyn database::pool::Connection, - index: &BenchmarkRequestIndex, ) -> anyhow::Result> { // Load ArtifactsReady and InProgress benchmark requests let mut pending = conn.load_pending_benchmark_requests().await?; // The queue starts with in progress - let mut queue: Vec = pending.extract_if_stable(|request| { + let mut queue: Vec = pending.requests.extract_if_stable(|request| { matches!(request.status(), BenchmarkRequestStatus::InProgress) }); @@ -162,8 +173,9 @@ pub async fn build_queue( queue.sort_unstable_by_key(|req| req.created_at()); // Add release artifacts ordered by the release tag (1.87.0 before 1.88.0) and `created_at`. - let mut release_artifacts: Vec = - pending.extract_if_stable(|request| request.is_release()); + let mut release_artifacts: Vec = pending + .requests + .extract_if_stable(|request| request.is_release()); release_artifacts.sort_unstable_by(|a, b| { a.tag() @@ -172,7 +184,8 @@ pub async fn build_queue( }); queue.append(&mut release_artifacts); - sort_benchmark_requests(index, &mut pending); + + let mut pending = sort_benchmark_requests(pending); queue.append(&mut pending); Ok(queue) } @@ -237,31 +250,28 @@ pub async fn enqueue_benchmark_request( Ok(()) } -/// Try to find a benchmark request that should be enqueue next, and if such request is found, -/// enqueue it. -async fn try_enqueue_next_benchmark_request( +/// Update the state of benchmark requests. +/// If there is a request that has artifacts ready, and nothing is currently in-progress, +/// it will be enqueued. +/// If there is a request whose jobs have all completed, it will be marked as completed. +async fn process_benchmark_requests( conn: &mut dyn database::pool::Connection, - index: &mut BenchmarkRequestIndex, ) -> anyhow::Result<()> { - let queue = build_queue(conn, index).await?; + let queue = build_queue(conn).await?; - #[allow(clippy::never_loop)] for request in queue { match request.status() { - BenchmarkRequestStatus::ArtifactsReady => { - enqueue_benchmark_request(conn, &request).await?; - break; - } BenchmarkRequestStatus::InProgress => { - if conn - .maybe_mark_benchmark_request_as_completed(request.tag().unwrap()) - .await? - { - index.add_tag(request.tag().unwrap()); + let tag = request.tag().expect("In progress request without a tag"); + if conn.maybe_mark_benchmark_request_as_completed(tag).await? { continue; } break; } + BenchmarkRequestStatus::ArtifactsReady => { + enqueue_benchmark_request(conn, &request).await?; + break; + } BenchmarkRequestStatus::WaitingForArtifacts | BenchmarkRequestStatus::Completed { .. } => { unreachable!("Unexpected request {request:?} found in request queue"); @@ -272,20 +282,24 @@ async fn try_enqueue_next_benchmark_request( } /// For queueing jobs, add the jobs you want to queue to this function -async fn cron_enqueue_jobs(site_ctxt: &Arc) -> anyhow::Result<()> { +async fn cron_enqueue_jobs(site_ctxt: &SiteCtxt) -> anyhow::Result<()> { let mut conn = site_ctxt.conn().await; - let mut index = conn.load_benchmark_request_index().await?; + + let index = conn.load_benchmark_request_index().await?; + // Put the master commits into the `benchmark_requests` queue create_benchmark_request_master_commits(site_ctxt, &*conn, &index).await?; // Put the releases into the `benchmark_requests` queue create_benchmark_request_releases(&*conn, &index).await?; - try_enqueue_next_benchmark_request(&mut *conn, &mut index).await?; + // Enqueue waiting requests and try to complete in-progress ones + process_benchmark_requests(&mut *conn).await?; + Ok(()) } -/// Entry point for the cron -pub async fn cron_main(site_ctxt: Arc>>>, seconds: u64) { - let mut interval = time::interval(Duration::from_secs(seconds)); +/// Entry point for the cron job that manages the benchmark request and job queue. +pub async fn cron_main(site_ctxt: Arc>>>, run_interval: Duration) { + let mut interval = time::interval(run_interval); let ctxt = site_ctxt.clone(); loop { @@ -470,7 +484,7 @@ mod tests { create_master("mmm", "aaa", 18), ]; - db_insert_requests(&*db, &requests).await; + db_insert_requests(db, &requests).await; db.attach_shas_to_try_benchmark_request(16, "try1", "rrr", Utc::now()) .await .unwrap(); @@ -482,7 +496,7 @@ mod tests { .unwrap(); mark_as_completed( - &*db, + db, &["bar", "345", "aaa", "rrr"], collector_name, benchmark_set, @@ -490,9 +504,7 @@ mod tests { ) .await; - let index = db.load_benchmark_request_index().await.unwrap(); - - let sorted: Vec = build_queue(&*db, &index).await.unwrap(); + let sorted: Vec = build_queue(db).await.unwrap(); queue_order_matches(&sorted, &["try1", "v1.2.3", "123", "foo", "mmm", "baz"]); Ok(ctx) diff --git a/site/src/main.rs b/site/src/main.rs index afb63c645..c74792242 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -4,6 +4,7 @@ use site::job_queue::{cron_main, run_new_queue}; use site::load; use std::env; use std::sync::Arc; +use std::time::Duration; use tokio::task; #[cfg(unix)] @@ -60,7 +61,11 @@ async fn main() { if run_new_queue() { task::spawn(async move { - cron_main(ctxt.clone(), queue_update_interval_seconds).await; + cron_main( + ctxt.clone(), + Duration::from_secs(queue_update_interval_seconds), + ) + .await; }); } diff --git a/site/src/request_handlers/status_page_new.rs b/site/src/request_handlers/status_page_new.rs index afba2d1d3..cd58ae122 100644 --- a/site/src/request_handlers/status_page_new.rs +++ b/site/src/request_handlers/status_page_new.rs @@ -13,10 +13,8 @@ use std::time::Duration; pub async fn handle_status_page_new(ctxt: Arc) -> anyhow::Result { let conn = ctxt.conn().await; - let index = conn.load_benchmark_request_index().await?; - // The queue contains any in-progress request(s) and then the following requests in queue order - let queue = build_queue(&*conn, &index).await?; + let queue = build_queue(&*conn).await?; let completed = conn.get_last_n_completed_benchmark_requests(10).await?; // Figure out approximately how long was the most recent master benchmark request