Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 36 additions & 49 deletions database/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub mod postgres;
pub mod sqlite;

#[derive(Debug)]
pub enum JobEnqueueResult {
JobCreated(u32),
JobAlreadyExisted,
RequestShaNotFound { error: String },
Other(anyhow::Error),
}

#[async_trait::async_trait]
pub trait Connection: Send + Sync {
async fn maybe_create_indices(&mut self);
Expand Down Expand Up @@ -238,20 +246,7 @@ pub trait Connection: Send + Sync {
profile: Profile,
benchmark_set: u32,
kind: BenchmarkJobKind,
) -> anyhow::Result<Option<u32>>;

/// Add a benchmark job which is explicitly using a `parent_sha` we split
/// this out to improve our error handling. A `parent_sha` may not have
/// an associated request in the `benchmarek`
async fn enqueue_parent_benchmark_job(
&self,
parent_sha: &str,
target: Target,
backend: CodegenBackend,
profile: Profile,
benchmark_set: u32,
kind: BenchmarkJobKind,
) -> (bool, anyhow::Result<u32>);
) -> JobEnqueueResult;

/// Returns a set of compile-time benchmark test cases that were already computed for the
/// given artifact.
Expand Down Expand Up @@ -460,6 +455,15 @@ mod tests {
}
}

impl JobEnqueueResult {
pub fn unwrap(self) -> u32 {
match self {
JobEnqueueResult::JobCreated(id) => id,
error => panic!("Unexpected job enqueue result: {error:?}"),
}
}
}

#[tokio::test]
async fn pstat_returns_empty_vector_when_empty() {
run_db_test(|ctx| async {
Expand Down Expand Up @@ -715,7 +719,10 @@ mod tests {
BenchmarkJobKind::Runtime,
)
.await;
assert!(result.is_ok());
match result {
JobEnqueueResult::JobCreated(_) => {}
error => panic!("Invalid result: {error:?}"),
}

Ok(ctx)
})
Expand Down Expand Up @@ -861,16 +868,20 @@ mod tests {
.unwrap();

// Now we can insert the job
db.enqueue_benchmark_job(
benchmark_request.tag().unwrap(),
Target::X86_64UnknownLinuxGnu,
CodegenBackend::Llvm,
Profile::Opt,
1u32,
BenchmarkJobKind::Runtime,
)
.await
.unwrap();
match db
.enqueue_benchmark_job(
benchmark_request.tag().unwrap(),
Target::X86_64UnknownLinuxGnu,
CodegenBackend::Llvm,
Profile::Opt,
1u32,
BenchmarkJobKind::Runtime,
)
.await
{
JobEnqueueResult::JobCreated(_) => {}
error => panic!("Invalid result: {error:?}"),
};

let (benchmark_job, artifact_id) = db
.dequeue_benchmark_job(
Expand Down Expand Up @@ -1205,29 +1216,6 @@ mod tests {
.await;
}

#[tokio::test]
async fn enqueue_parent_benchmark_job() {
run_postgres_test(|ctx| async {
let db = ctx.db();

let (violates_foreign_key, _) = db
.enqueue_parent_benchmark_job(
"sha-0",
Target::X86_64UnknownLinuxGnu,
CodegenBackend::Llvm,
Profile::Debug,
0,
BenchmarkJobKind::Runtime,
)
.await;

assert!(violates_foreign_key);

Ok(ctx)
})
.await;
}

#[tokio::test]
async fn purge_artifact() {
run_postgres_test(|ctx| async {
Expand All @@ -1244,7 +1232,6 @@ mod tests {
BenchmarkJobKind::Compiletime,
)
.await
.unwrap()
.unwrap();
db.purge_artifact(&ArtifactId::Tag("foo".to_string())).await;

Expand Down
86 changes: 21 additions & 65 deletions database/src/pool/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction};
use crate::pool::{
Connection, ConnectionManager, JobEnqueueResult, ManagedConnection, Transaction,
};
use crate::selector::CompileTestCase;
use crate::{
ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJob,
Expand Down Expand Up @@ -1773,18 +1775,19 @@ where
})
}

async fn enqueue_parent_benchmark_job(
async fn enqueue_benchmark_job(
&self,
parent_sha: &str,
request_tag: &str,
target: Target,
backend: CodegenBackend,
profile: Profile,
benchmark_set: u32,
kind: BenchmarkJobKind,
) -> (bool, anyhow::Result<u32>) {
let row_result = self
) -> JobEnqueueResult {
// This will return zero rows if the job already exists
let result = self
.conn()
.query_one(
.query(
r#"
INSERT INTO job_queue(
request_tag,
Expand All @@ -1800,7 +1803,7 @@ where
RETURNING job_queue.id
"#,
&[
&parent_sha,
&request_tag,
&target,
&backend,
&profile,
Expand All @@ -1811,75 +1814,28 @@ where
)
.await;

match row_result {
Ok(row) => (false, Ok(row.get::<_, i32>(0) as u32)),
match result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to be able to duplicate jobs in the database. I think we agree on this.

If we inserted a duplicate, somehow, it would trigger a SqlState::FOREIGN_KEY_VIOLATION which, while correct, does not pick up the nuance that it could be a parent that we want to ignore.

Essentially I think we should leave the two functions distinct. Tempting as it is to refactor it, it makes it more confusing as to what is happening.

For reference these are the clauses constraints;

CONSTRAINT job_queue_request_fk
    FOREIGN KEY (request_tag)
    REFERENCES benchmark_request(tag)
    ON DELETE CASCADE,

CONSTRAINT job_queue_collector
    FOREIGN KEY (collector_id)
    REFERENCES collector_config(id)
    ON DELETE CASCADE,

CONSTRAINT job_queue_unique
UNIQUE (
    request_tag,
    target,
    backend,
    profile,
    benchmark_set
)

Which we do want. I'm not sure how JobEnqueueResult::RequestShaNotFound being returned as an error makes sense when the error was a SqlState::FOREIGN_KEY_VIOLATION? That would imply that the request_sha already existed?

It doesn't seem clear anymore that we are ignoring parents who already had a job associated with them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Foreign key violation won't be raised because of duplicates, because the query uses ON CONFLICT DO NOTHING.

It doesn't seem clear anymore that we are ignoring parents who already had a job associated with them.

The function now has an explicit result type that encodes this much more clearly than before (where it returned a bool and a result). The commit vs parent job insertion mode is handled in the job queue; we could also put it into this function, but I don't think it's better.

I'm not sure how JobEnqueueResult::RequestShaNotFound being returned as an error makes sense when the error was a SqlState::FOREIGN_KEY_VIOLATION?

In theory we could also get another FK violation (at this point I think only the collector name), but right now RequestShaNotFound is the only case that actually matters. I included the error string, so we can find out in the log if something else ever happened, and rename in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the ON CONFLICT DO NOTHING was not there for a parent, as that allowed us to be able to detect foreign key violations - i.e a backfill did not need to take place. I stand corrected on that.

Tracing the logic - we would now fall through to JobEnqueueResult::Other(e.into()) if it was any other error than the request_tag not existing in the benchmark_request table. The logic for saying that is okay for a parent is now in job_queue/mod.rs.

This feels a lot more complicated than what was there previously, it wasn't slick or terribly clever but it was dead simple to follow. I'm not sure there is a compelling enough reason to fragment the logic and generalise for consolidating one function? It feels the fix was changing query_one to query for the parent insert?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it's easier to follow now - there is a single function that explicitly enumerates which error conditions can happen, and the job queue then explicitly handles these error conditions. As a plus we don't need to have two functions that we need to keep in sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree. Having modes passed to a function, following a web of control flow and fall through logic, which could potentially grow, to figure out what is going on vs having two distinct code paths seems more complicated.

If we want to centralise things the query could be a good candidate. We could have one function that does the insert and two wrappers called what they are currently? That way we have one central location that avoids drift but two distinct functions that can handle things differently?

Ok(rows) => {
let Some(row) = rows.into_iter().next() else {
return JobEnqueueResult::JobAlreadyExisted;
};
JobEnqueueResult::JobCreated(row.get::<_, i32>(0) as u32)
}
Err(e) => {
if let Some(db_err) = e.as_db_error() {
if db_err.code() == &SqlState::FOREIGN_KEY_VIOLATION {
let constraint = db_err.constraint().unwrap_or("benchmark_tag constraint");
let detail = db_err.detail().unwrap_or("");
return (
true,
Err(anyhow::anyhow!(
"Foreign key violation on {} for request_tag='{}'. {}",
constraint,
parent_sha,
detail
)),
);
return JobEnqueueResult::RequestShaNotFound {
error: format!("Foreign key violation on `{constraint}`: {detail}"),
};
}
}
(false, Err(e.into()))
JobEnqueueResult::Other(e.into())
}
}
}

async fn enqueue_benchmark_job(
&self,
request_tag: &str,
target: Target,
backend: CodegenBackend,
profile: Profile,
benchmark_set: u32,
kind: BenchmarkJobKind,
) -> anyhow::Result<Option<u32>> {
// This will return zero rows if the job already exists
let rows = self
.conn()
.query(
r#"
INSERT INTO job_queue(
request_tag,
target,
backend,
profile,
benchmark_set,
status,
kind
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT DO NOTHING
RETURNING job_queue.id
"#,
&[
&request_tag,
&target,
&backend,
&profile,
&(benchmark_set as i32),
&BENCHMARK_JOB_STATUS_QUEUED_STR,
&kind,
],
)
.await
.context("failed to insert benchmark_job")?;
if let Some(row) = rows.first() {
Ok(Some(row.get::<_, i32>(0) as u32))
} else {
Ok(None)
}
}

async fn get_compile_test_cases_with_measurements(
&self,
artifact_row_id: &ArtifactIdNumber,
Expand Down
18 changes: 4 additions & 14 deletions database/src/pool/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction};
use crate::pool::{
Connection, ConnectionManager, JobEnqueueResult, ManagedConnection, Transaction,
};
use crate::selector::CompileTestCase;
use crate::{
ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkJobConclusion,
Expand Down Expand Up @@ -1356,19 +1358,7 @@ impl Connection for SqliteConnection {
_profile: Profile,
_benchmark_set: u32,
_kind: BenchmarkJobKind,
) -> anyhow::Result<Option<u32>> {
no_queue_implementation_abort!()
}

async fn enqueue_parent_benchmark_job(
&self,
_parent_sha: &str,
_target: Target,
_backend: CodegenBackend,
_profile: Profile,
_benchmark_set: u32,
_kind: BenchmarkJobKind,
) -> (bool, anyhow::Result<u32>) {
) -> JobEnqueueResult {
no_queue_implementation_abort!()
}

Expand Down
10 changes: 7 additions & 3 deletions database/src/tests/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::pool::JobEnqueueResult;
use crate::{
BenchmarkJob, BenchmarkJobConclusion, BenchmarkJobKind, BenchmarkRequest,
BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig, Connection, Profile,
Expand Down Expand Up @@ -40,7 +41,7 @@ impl RequestBuilder {

pub async fn add_jobs(mut self, db: &dyn Connection, jobs: &[JobBuilder]) -> Self {
for job in jobs {
let id = db
let id = match db
.enqueue_benchmark_job(
self.tag(),
job.target,
Expand All @@ -50,8 +51,11 @@ impl RequestBuilder {
job.kind,
)
.await
.unwrap();
self.jobs.push((job.clone(), id.unwrap()));
{
JobEnqueueResult::JobCreated(id) => id,
error => panic!("Unexpected job enqueue result: {error:?}"),
};
self.jobs.push((job.clone(), id));
}
self
}
Expand Down
Loading
Loading