diff --git a/database/src/pool.rs b/database/src/pool.rs index 5269be392..daaf85117 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -15,6 +15,14 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore}; pub mod postgres; pub mod sqlite; +#[derive(Debug)] +pub enum JobEnqueueResult { + JobCreated(u32), + JobAlreadyExisted, + RequestShaNotFound { error: String }, + Other(anyhow::Error), +} + #[async_trait::async_trait] pub trait Connection: Send + Sync { async fn maybe_create_indices(&mut self); @@ -238,20 +246,7 @@ pub trait Connection: Send + Sync { profile: Profile, benchmark_set: u32, kind: BenchmarkJobKind, - ) -> anyhow::Result>; - - /// Add a benchmark job which is explicitly using a `parent_sha` we split - /// this out to improve our error handling. A `parent_sha` may not have - /// an associated request in the `benchmarek` - async fn enqueue_parent_benchmark_job( - &self, - parent_sha: &str, - target: Target, - backend: CodegenBackend, - profile: Profile, - benchmark_set: u32, - kind: BenchmarkJobKind, - ) -> (bool, anyhow::Result); + ) -> JobEnqueueResult; /// Returns a set of compile-time benchmark test cases that were already computed for the /// given artifact. @@ -460,6 +455,15 @@ mod tests { } } + impl JobEnqueueResult { + pub fn unwrap(self) -> u32 { + match self { + JobEnqueueResult::JobCreated(id) => id, + error => panic!("Unexpected job enqueue result: {error:?}"), + } + } + } + #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -715,7 +719,10 @@ mod tests { BenchmarkJobKind::Runtime, ) .await; - assert!(result.is_ok()); + match result { + JobEnqueueResult::JobCreated(_) => {} + error => panic!("Invalid result: {error:?}"), + } Ok(ctx) }) @@ -861,16 +868,20 @@ mod tests { .unwrap(); // Now we can insert the job - db.enqueue_benchmark_job( - benchmark_request.tag().unwrap(), - Target::X86_64UnknownLinuxGnu, - CodegenBackend::Llvm, - Profile::Opt, - 1u32, - BenchmarkJobKind::Runtime, - ) - .await - .unwrap(); + match db + .enqueue_benchmark_job( + benchmark_request.tag().unwrap(), + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + Profile::Opt, + 1u32, + BenchmarkJobKind::Runtime, + ) + .await + { + JobEnqueueResult::JobCreated(_) => {} + error => panic!("Invalid result: {error:?}"), + }; let (benchmark_job, artifact_id) = db .dequeue_benchmark_job( @@ -1205,29 +1216,6 @@ mod tests { .await; } - #[tokio::test] - async fn enqueue_parent_benchmark_job() { - run_postgres_test(|ctx| async { - let db = ctx.db(); - - let (violates_foreign_key, _) = db - .enqueue_parent_benchmark_job( - "sha-0", - Target::X86_64UnknownLinuxGnu, - CodegenBackend::Llvm, - Profile::Debug, - 0, - BenchmarkJobKind::Runtime, - ) - .await; - - assert!(violates_foreign_key); - - Ok(ctx) - }) - .await; - } - #[tokio::test] async fn purge_artifact() { run_postgres_test(|ctx| async { @@ -1244,7 +1232,6 @@ mod tests { BenchmarkJobKind::Compiletime, ) .await - .unwrap() .unwrap(); db.purge_artifact(&ArtifactId::Tag("foo".to_string())).await; diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index fd855c8d4..38c1c46cb 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,4 +1,6 @@ -use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; +use crate::pool::{ + Connection, ConnectionManager, JobEnqueueResult, ManagedConnection, Transaction, +}; use crate::selector::CompileTestCase; use crate::{ ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJob, @@ -1773,18 +1775,19 @@ where }) } - async fn enqueue_parent_benchmark_job( + async fn enqueue_benchmark_job( &self, - parent_sha: &str, + request_tag: &str, target: Target, backend: CodegenBackend, profile: Profile, benchmark_set: u32, kind: BenchmarkJobKind, - ) -> (bool, anyhow::Result) { - let row_result = self + ) -> JobEnqueueResult { + // This will return zero rows if the job already exists + let result = self .conn() - .query_one( + .query( r#" INSERT INTO job_queue( request_tag, @@ -1800,7 +1803,7 @@ where RETURNING job_queue.id "#, &[ - &parent_sha, + &request_tag, &target, &backend, &profile, @@ -1811,75 +1814,28 @@ where ) .await; - match row_result { - Ok(row) => (false, Ok(row.get::<_, i32>(0) as u32)), + match result { + Ok(rows) => { + let Some(row) = rows.into_iter().next() else { + return JobEnqueueResult::JobAlreadyExisted; + }; + JobEnqueueResult::JobCreated(row.get::<_, i32>(0) as u32) + } Err(e) => { if let Some(db_err) = e.as_db_error() { if db_err.code() == &SqlState::FOREIGN_KEY_VIOLATION { let constraint = db_err.constraint().unwrap_or("benchmark_tag constraint"); let detail = db_err.detail().unwrap_or(""); - return ( - true, - Err(anyhow::anyhow!( - "Foreign key violation on {} for request_tag='{}'. {}", - constraint, - parent_sha, - detail - )), - ); + return JobEnqueueResult::RequestShaNotFound { + error: format!("Foreign key violation on `{constraint}`: {detail}"), + }; } } - (false, Err(e.into())) + JobEnqueueResult::Other(e.into()) } } } - async fn enqueue_benchmark_job( - &self, - request_tag: &str, - target: Target, - backend: CodegenBackend, - profile: Profile, - benchmark_set: u32, - kind: BenchmarkJobKind, - ) -> anyhow::Result> { - // This will return zero rows if the job already exists - let rows = self - .conn() - .query( - r#" - INSERT INTO job_queue( - request_tag, - target, - backend, - profile, - benchmark_set, - status, - kind - ) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT DO NOTHING - RETURNING job_queue.id - "#, - &[ - &request_tag, - &target, - &backend, - &profile, - &(benchmark_set as i32), - &BENCHMARK_JOB_STATUS_QUEUED_STR, - &kind, - ], - ) - .await - .context("failed to insert benchmark_job")?; - if let Some(row) = rows.first() { - Ok(Some(row.get::<_, i32>(0) as u32)) - } else { - Ok(None) - } - } - async fn get_compile_test_cases_with_measurements( &self, artifact_row_id: &ArtifactIdNumber, diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index b192ddd38..0e9e511e0 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,4 +1,6 @@ -use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; +use crate::pool::{ + Connection, ConnectionManager, JobEnqueueResult, ManagedConnection, Transaction, +}; use crate::selector::CompileTestCase; use crate::{ ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkJobConclusion, @@ -1356,19 +1358,7 @@ impl Connection for SqliteConnection { _profile: Profile, _benchmark_set: u32, _kind: BenchmarkJobKind, - ) -> anyhow::Result> { - no_queue_implementation_abort!() - } - - async fn enqueue_parent_benchmark_job( - &self, - _parent_sha: &str, - _target: Target, - _backend: CodegenBackend, - _profile: Profile, - _benchmark_set: u32, - _kind: BenchmarkJobKind, - ) -> (bool, anyhow::Result) { + ) -> JobEnqueueResult { no_queue_implementation_abort!() } diff --git a/database/src/tests/builder.rs b/database/src/tests/builder.rs index 11020a551..5087fee5e 100644 --- a/database/src/tests/builder.rs +++ b/database/src/tests/builder.rs @@ -1,3 +1,4 @@ +use crate::pool::JobEnqueueResult; use crate::{ BenchmarkJob, BenchmarkJobConclusion, BenchmarkJobKind, BenchmarkRequest, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig, Connection, Profile, @@ -40,7 +41,7 @@ impl RequestBuilder { pub async fn add_jobs(mut self, db: &dyn Connection, jobs: &[JobBuilder]) -> Self { for job in jobs { - let id = db + let id = match db .enqueue_benchmark_job( self.tag(), job.target, @@ -50,8 +51,11 @@ impl RequestBuilder { job.kind, ) .await - .unwrap(); - self.jobs.push((job.clone(), id.unwrap())); + { + JobEnqueueResult::JobCreated(id) => id, + error => panic!("Unexpected job enqueue result: {error:?}"), + }; + self.jobs.push((job.clone(), id)); } self } diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index badac72a2..f45eb677c 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -6,7 +6,7 @@ use crate::load::{partition_in_place, SiteCtxt}; use anyhow::Context; use chrono::Utc; use collector::benchmark_set::benchmark_set_count; -use database::pool::Transaction; +use database::pool::{JobEnqueueResult, Transaction}; use database::{ BenchmarkJobKind, BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, Date, PendingBenchmarkRequests, Profile, QueuedCommit, @@ -221,23 +221,51 @@ pub async fn enqueue_benchmark_request( let backends = request.backends()?; let profiles = request.profiles()?; - let enqueue_job_required = async |tx: &mut Box, - request_tag, - target, - backend, - profile, - benchmark_set, - kind| { - let created_job = tx + #[derive(PartialEq, Debug)] + enum EnqueueMode { + Commit, + Parent, + } + + let enqueue_job = async |tx: &mut Box, + request_tag, + target, + backend, + profile, + benchmark_set, + kind, + mode: EnqueueMode| { + let result = tx .conn() .enqueue_benchmark_job(request_tag, target, backend, profile, benchmark_set, kind) - .await - .with_context(|| anyhow::anyhow!("Enqueuing job for request {request_tag} (target={target}, backend={backend}, profile={profile}, set={benchmark_set}, kind={kind})"))?; - match created_job { - Some(_) => Ok(()), - None => Err(anyhow::anyhow!( - "Cannot create job for tag {request_tag} (target={target}, backend={backend}, profile={profile}, set={benchmark_set}, kind={kind}): job already exists in the DB" - )), + .await; + + let make_error = |msg| { + anyhow::anyhow!( + "Cannot create job for tag {request_tag} (target={target}, backend={backend}, profile={profile}, set={benchmark_set}, kind={kind}, mode={mode:?}): {msg}" + ) + }; + + match result { + JobEnqueueResult::JobCreated(_) => Ok(()), + JobEnqueueResult::JobAlreadyExisted => { + match mode { + EnqueueMode::Commit => Err(make_error("job already exists in the DB")), + EnqueueMode::Parent => { + // For parents this is expected + Ok(()) + } + } + } + JobEnqueueResult::RequestShaNotFound { error } => match mode { + EnqueueMode::Commit => Err(make_error(&format!("request SHA not found: {error}"))), + EnqueueMode::Parent => { + // This should not happen often, but we do not want to block the queue on it + log::error!("{}", make_error(&format!("parent SHA not found: {error}"))); + Ok(()) + } + }, + JobEnqueueResult::Other(error) => Err(error), } }; @@ -246,7 +274,7 @@ pub async fn enqueue_benchmark_request( for benchmark_set in 0..benchmark_set_count(target.into()) { for &backend in backends.iter() { for &profile in profiles.iter() { - enqueue_job_required( + enqueue_job( &mut tx, request_tag, target, @@ -254,8 +282,10 @@ pub async fn enqueue_benchmark_request( profile, benchmark_set as u32, BenchmarkJobKind::Compiletime, + EnqueueMode::Commit, ) .await?; + // If there is a parent, we create a job for it too. The // database will ignore it if there is already a job there. // If the parent job has been deleted from the database @@ -267,30 +297,17 @@ pub async fn enqueue_benchmark_request( if tx.conn().parent_of(parent_sha).await.is_some() { continue; } - let (is_foreign_key_violation, result) = tx - .conn() - .enqueue_parent_benchmark_job( - parent_sha, - target, - backend, - profile, - benchmark_set as u32, - BenchmarkJobKind::Compiletime, - ) - .await; - - // At some point in time the parent_sha may not refer - // to a `benchmark_request` and we want to be able to - // see that error. - if let Err(e) = result { - if is_foreign_key_violation { - log::error!("Failed to create job for parent sha {e:?}"); - } else { - return Err(anyhow::anyhow!( - "Cannot enqueue parent benchmark job: {e:?}" - )); - } - } + enqueue_job( + &mut tx, + parent_sha, + target, + backend, + profile, + benchmark_set as u32, + BenchmarkJobKind::Compiletime, + EnqueueMode::Parent, + ) + .await?; } } } @@ -298,7 +315,7 @@ pub async fn enqueue_benchmark_request( // Enqueue Runtime job for all targets using LLVM as the backend for // runtime benchmarks - enqueue_job_required( + enqueue_job( &mut tx, request_tag, target, @@ -306,6 +323,7 @@ pub async fn enqueue_benchmark_request( Profile::Opt, 0u32, BenchmarkJobKind::Runtime, + EnqueueMode::Commit, ) .await?; } @@ -314,7 +332,7 @@ pub async fn enqueue_benchmark_request( // it takes to build the rust compiler. It takes a while to run and is // assumed that if the compilation of other rust project improve then this // too would improve. - enqueue_job_required( + enqueue_job( &mut tx, request_tag, Target::X86_64UnknownLinuxGnu, @@ -322,6 +340,7 @@ pub async fn enqueue_benchmark_request( Profile::Opt, 0u32, BenchmarkJobKind::Rustc, + EnqueueMode::Commit, ) .await?; @@ -407,12 +426,17 @@ async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> { } } // Enqueue waiting requests and try to complete in-progress ones - let completed_reqs = process_benchmark_requests(&mut *conn).await?; + let completed_reqs = process_benchmark_requests(&mut *conn) + .await + .context("Failed to process benchmark requests")?; // If some change happened, reload the benchmark request index if requests_inserted { - ctxt.known_benchmark_requests - .store(Arc::new(conn.load_benchmark_request_index().await?)); + ctxt.known_benchmark_requests.store(Arc::new( + conn.load_benchmark_request_index() + .await + .context("Failed to load benchmark request index")?, + )); } // Send a comment to GitHub for completed requests and reload the DB index @@ -463,6 +487,7 @@ async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> { .join(","), ), }; + log::debug!("Posting comparison comment to {pr}"); post_comparison_comment(ctxt, commit, is_master).await?; } } @@ -507,6 +532,7 @@ pub async fn create_job_queue_process( mod tests { use crate::job_queue::{build_queue, process_benchmark_requests}; use chrono::Utc; + use database::pool::JobEnqueueResult; use database::tests::run_postgres_test; use database::{ BenchmarkJobConclusion, BenchmarkJobKind, BenchmarkRequest, BenchmarkRequestStatus, @@ -542,16 +568,20 @@ mod tests { target: Target, ) { /* Create job for the request */ - db.enqueue_benchmark_job( - request_tag, - target, - CodegenBackend::Llvm, - Profile::Opt, - benchmark_set, - BenchmarkJobKind::Compiletime, - ) - .await - .unwrap(); + match db + .enqueue_benchmark_job( + request_tag, + target, + CodegenBackend::Llvm, + Profile::Opt, + benchmark_set, + BenchmarkJobKind::Compiletime, + ) + .await + { + JobEnqueueResult::JobCreated(_) => {} + error => panic!("Unexpected result: {error:?}"), + } let (job, _) = db .dequeue_benchmark_job(collector_name, target, BenchmarkSet::new(benchmark_set))