Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,29 +1041,23 @@ impl BenchmarkRequest {
}

/// Cached information about benchmark requests in the DB
/// FIXME: only store non-try requests here
pub struct BenchmarkRequestIndex {
/// Tags (SHA or release name) of all known benchmark requests
all: HashSet<String>,
/// Tags (SHA or release name) of all benchmark requests in the completed status
completed: HashSet<String>,
}

impl BenchmarkRequestIndex {
/// Do we already have a benchmark request for the passed `tag`?
pub fn contains_tag(&self, tag: &str) -> bool {
self.all.contains(tag)
}
}

/// Return tags of already completed benchmark requests.
pub fn completed_requests(&self) -> &HashSet<String> {
&self.completed
}

pub fn add_tag(&mut self, tag: &str) {
self.all.insert(tag.to_string());
self.completed.insert(tag.to_string());
}
/// Contains pending (ArtifactsReady or InProgress) benchmark requests, and a set of their parents
/// that are already completed.
pub struct PendingBenchmarkRequests {
pub requests: Vec<BenchmarkRequest>,
pub completed_parent_tags: HashSet<String>,
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
133 changes: 47 additions & 86 deletions database/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::selector::CompileTestCase;
use crate::{
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion,
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestWithErrors,
BenchmarkSet, CodegenBackend, CollectorConfig, CompileBenchmark, Target,
BenchmarkSet, CodegenBackend, CollectorConfig, CompileBenchmark, PendingBenchmarkRequests,
Target,
};
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -189,7 +190,8 @@ pub trait Connection: Send + Sync {
async fn purge_artifact(&self, aid: &ArtifactId);

/// Add an item to the `benchmark_requests`, if the `benchmark_request`
/// exists an Error will be returned
/// exists an Error will be returned.
/// We require the caller to pass an index, to ensure that it is always kept up-to-date.
async fn insert_benchmark_request(
&self,
benchmark_request: &BenchmarkRequest,
Expand All @@ -200,7 +202,9 @@ pub trait Connection: Send + Sync {

/// Load all pending benchmark requests, i.e. those that have artifacts ready, but haven't
/// been completed yet. Pending statuses are `ArtifactsReady` and `InProgress`.
async fn load_pending_benchmark_requests(&self) -> anyhow::Result<Vec<BenchmarkRequest>>;
/// Also returns their parents, so that we can quickly check which requests are ready for being
/// enqueued.
async fn load_pending_benchmark_requests(&self) -> anyhow::Result<PendingBenchmarkRequests>;

/// Update the status of a `benchmark_request` with the given `tag`.
/// If no such request exists in the DB, returns an error.
Expand Down Expand Up @@ -426,9 +430,9 @@ mod tests {
use crate::tests::run_postgres_test;
use crate::{tests::run_db_test, BenchmarkRequestType, Commit, CommitType, Date};
use chrono::Utc;
use std::collections::BTreeSet;
use std::str::FromStr;

/// Create a Commit
fn create_commit(commit_sha: &str, time: chrono::DateTime<Utc>, r#type: CommitType) -> Commit {
Commit {
sha: commit_sha.into(),
Expand All @@ -437,43 +441,6 @@ mod tests {
}
}

async fn complete_request(
db: &dyn Connection,
request_tag: &str,
collector_name: &str,
benchmark_set: u32,
target: Target,
) {
/* Create job for the request */
db.enqueue_benchmark_job(
request_tag,
target,
CodegenBackend::Llvm,
Profile::Opt,
benchmark_set,
)
.await
.unwrap();

let (job, _) = db
.dequeue_benchmark_job(collector_name, target, BenchmarkSet(benchmark_set))
.await
.unwrap()
.unwrap();

assert_eq!(job.request_tag(), request_tag);

/* Mark the job as complete */
db.mark_benchmark_job_as_completed(job.id(), BenchmarkJobConclusion::Success)
.await
.unwrap();

assert!(db
.maybe_mark_benchmark_request_as_completed(request_tag)
.await
.unwrap());
}

#[tokio::test]
async fn pstat_returns_empty_vector_when_empty() {
run_db_test(|ctx| async {
Expand Down Expand Up @@ -559,37 +526,25 @@ mod tests {
async fn multiple_non_completed_try_requests() {
run_postgres_test(|ctx| async {
let db = ctx.db();
let target = Target::X86_64UnknownLinuxGnu;
let collector_name = "collector-1";
let benchmark_set = 1;

db.add_collector_config(collector_name, target, benchmark_set, true)
.await
.unwrap();

// Complete parent
let parent = BenchmarkRequest::create_release("sha-parent-1", Utc::now());
// Complete
let req_a = BenchmarkRequest::create_try_without_artifacts(42, "", "");
// WaitingForArtifacts
let req_b = BenchmarkRequest::create_try_without_artifacts(42, "", "");
let req_c = BenchmarkRequest::create_try_without_artifacts(42, "", "");

db.insert_benchmark_request(&parent).await.unwrap();
db.insert_benchmark_request(&req_a).await.unwrap();
db.attach_shas_to_try_benchmark_request(42, "sha1", "sha-parent-1", Utc::now())
// Insert a try build
ctx.insert_try_request(42).await;
db.attach_shas_to_try_benchmark_request(42, "sha-1", "sha-parent-1", Utc::now())
.await
.unwrap();

complete_request(db, "sha-parent-1", collector_name, benchmark_set, target).await;
complete_request(db, "sha1", collector_name, benchmark_set, target).await;
// Then finish it
ctx.complete_request("sha-1").await;

// This should be fine, req_a was completed
db.insert_benchmark_request(&req_b).await.unwrap();
// This should fail, we can't have two queued requests at once
db.insert_benchmark_request(&req_c).await.expect_err(
"It was possible to record two try benchmark requests without artifacts",
);
// Insert a try build for the same PR again
// This should be fine, because the previous request was already completed
ctx.insert_try_request(42).await;
// But this should fail, as we can't have two queued requests at once
db.insert_benchmark_request(&BenchmarkRequest::create_try_without_artifacts(
42, "", "",
))
.await
.expect_err("It was possible to record two try benchmark requests without artifacts");

Ok(ctx)
})
Expand Down Expand Up @@ -629,43 +584,48 @@ mod tests {
async fn load_pending_benchmark_requests() {
run_postgres_test(|ctx| async {
let db = ctx.db();
let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap();
let target = Target::X86_64UnknownLinuxGnu;
let collector_name = "collector-1";
let benchmark_set = 1;

db.add_collector_config(collector_name, target, benchmark_set, true)
.await
.unwrap();

// ArtifactsReady
let req_a = BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time);
let req_a = ctx.insert_master_request("sha-1", "parent-sha-1", 42).await;
// ArtifactsReady
let req_b = BenchmarkRequest::create_release("1.80.0", time);
let req_b = ctx.insert_release_request("1.80.0").await;
// WaitingForArtifacts
let req_c = BenchmarkRequest::create_try_without_artifacts(50, "", "");
ctx.insert_try_request(50).await;
// InProgress
let req_d = BenchmarkRequest::create_master("sha-2", "parent-sha-2", 51, time);
let req_d = ctx.insert_master_request("sha-2", "parent-sha-2", 51).await;
// Completed
let req_e = BenchmarkRequest::create_release("1.79.0", time);
ctx.insert_release_request("1.79.0").await;

for &req in &[&req_a, &req_b, &req_c, &req_d, &req_e] {
db.insert_benchmark_request(req).await.unwrap();
}

complete_request(db, "1.79.0", collector_name, benchmark_set, target).await;
ctx.complete_request("1.79.0").await;
ctx.insert_master_request("parent-sha-1", "grandparent-sha-0", 100)
.await;
ctx.complete_request("parent-sha-1").await;
ctx.insert_master_request("parent-sha-2", "grandparent-sha-1", 101)
.await;
ctx.complete_request("parent-sha-2").await;

db.update_benchmark_request_status("sha-2", BenchmarkRequestStatus::InProgress)
.await
.unwrap();

let requests = db.load_pending_benchmark_requests().await.unwrap();
let pending = db.load_pending_benchmark_requests().await.unwrap();
let requests = pending.requests;

assert_eq!(requests.len(), 3);
for req in &[req_a, req_b, req_d] {
assert!(requests.iter().any(|r| r.tag() == req.tag()));
}

assert_eq!(
pending
.completed_parent_tags
.into_iter()
.collect::<BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
vec!["parent-sha-1".to_string(), "parent-sha-2".to_string()]
);

Ok(ctx)
})
.await;
Expand All @@ -687,6 +647,7 @@ mod tests {
.load_pending_benchmark_requests()
.await
.unwrap()
.requests
.into_iter()
.next()
.unwrap();
Expand Down
88 changes: 50 additions & 38 deletions database/src/pool/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use crate::{
BenchmarkJobConclusion, BenchmarkJobStatus, BenchmarkRequest, BenchmarkRequestIndex,
BenchmarkRequestStatus, BenchmarkRequestType, BenchmarkRequestWithErrors, BenchmarkSet,
CodegenBackend, CollectionId, CollectorConfig, Commit, CommitType, CompileBenchmark, Date,
Index, Profile, QueuedCommit, Scenario, Target, BENCHMARK_JOB_STATUS_FAILURE_STR,
BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, BENCHMARK_JOB_STATUS_QUEUED_STR,
BENCHMARK_JOB_STATUS_SUCCESS_STR, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR,
Index, PendingBenchmarkRequests, Profile, QueuedCommit, Scenario, Target,
BENCHMARK_JOB_STATUS_FAILURE_STR, BENCHMARK_JOB_STATUS_IN_PROGRESS_STR,
BENCHMARK_JOB_STATUS_QUEUED_STR, BENCHMARK_JOB_STATUS_SUCCESS_STR,
BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR,
BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR,
BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR,
BENCHMARK_REQUEST_TRY_STR,
Expand Down Expand Up @@ -500,6 +501,7 @@ pub struct CachedStatements {
get_compile_test_cases_with_measurements: Statement,
get_last_n_completed_requests_with_errors: Statement,
get_jobs_of_in_progress_benchmark_requests: Statement,
load_pending_benchmark_requests: Statement,
}

pub struct PostgresTransaction<'a> {
Expand Down Expand Up @@ -683,7 +685,7 @@ impl PostgresConnection {
where aid = $1
").await.unwrap(),
load_benchmark_request_index: conn.prepare("
SELECT tag, status
SELECT tag
FROM benchmark_request
WHERE tag IS NOT NULL
").await.unwrap(),
Expand Down Expand Up @@ -751,6 +753,18 @@ impl PostgresConnection {
-- Only get the jobs of in_progress requests
SELECT * FROM job_queue INNER JOIN requests ON job_queue.request_tag = requests.tag
")).await.unwrap(),
// Load pending benchmark requests, along with information whether their parent is
// completed or not
load_pending_benchmark_requests: conn.prepare(&format!("
WITH pending AS (
SELECT {BENCHMARK_REQUEST_COLUMNS}
FROM benchmark_request AS req
WHERE status IN ('{BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR}', '{BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR}')
)
SELECT (parent.status = '{BENCHMARK_REQUEST_STATUS_COMPLETED_STR}') AS parent_done, pending.*
FROM pending
LEFT JOIN benchmark_request as parent ON parent.tag = pending.parent_sha
")).await.unwrap(),
}),
conn,
}
Expand Down Expand Up @@ -809,7 +823,7 @@ where
None => Date(Utc.with_ymd_and_hms(2001, 1, 1, 0, 0, 0).unwrap()),
}
},
r#type: CommitType::from_str(&row.get::<_, String>(3)).unwrap()
r#type: CommitType::from_str(&row.get::<_, String>(3)).unwrap(),
},
)
})
Expand Down Expand Up @@ -1188,13 +1202,13 @@ where
Some(aid) => aid.get::<_, i32>(0) as u32,
None => {
self.conn()
.query_opt("insert into artifact (name, date, type) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING id", &[
&info.name,
&info.date,
&info.kind,
])
.await
.unwrap();
.query_opt("insert into artifact (name, date, type) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING id", &[
&info.name,
&info.date,
&info.kind,
])
.await
.unwrap();
self.conn()
.query_one("select id from artifact where name = $1", &[&info.name])
.await
Expand Down Expand Up @@ -1623,18 +1637,11 @@ where
.await
.context("Cannot load benchmark request index")?;

let mut all = HashSet::with_capacity(requests.len());
let mut completed = HashSet::with_capacity(requests.len());
for request in requests {
let tag = request.get::<_, String>(0);
let status = request.get::<_, &str>(1);

if status == BENCHMARK_REQUEST_STATUS_COMPLETED_STR {
completed.insert(tag.clone());
}
all.insert(tag);
}
Ok(BenchmarkRequestIndex { all, completed })
let all = requests
.into_iter()
.map(|row| row.get::<_, String>(0))
.collect();
Ok(BenchmarkRequestIndex { all })
}

async fn update_benchmark_request_status(
Expand Down Expand Up @@ -1705,25 +1712,30 @@ where
Ok(())
}

async fn load_pending_benchmark_requests(&self) -> anyhow::Result<Vec<BenchmarkRequest>> {
let query = format!(
r#"
SELECT {BENCHMARK_REQUEST_COLUMNS}
FROM benchmark_request
WHERE status IN('{BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR}', '{BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR}')"#
);

async fn load_pending_benchmark_requests(&self) -> anyhow::Result<PendingBenchmarkRequests> {
let rows = self
.conn()
.query(&query, &[])
.query(&self.statements().load_pending_benchmark_requests, &[])
.await
.context("Failed to get pending benchmark requests")?;

let requests = rows
.into_iter()
.map(|it| row_to_benchmark_request(&it, None))
.collect();
Ok(requests)
let mut completed_parent_tags = HashSet::new();
let mut requests = Vec::with_capacity(rows.len());
for row in rows {
let parent_done = row.get::<_, Option<bool>>(0);
let request = row_to_benchmark_request(&row, Some(1));
if let Some(true) = parent_done {
if let Some(parent) = request.parent_sha() {
completed_parent_tags.insert(parent.to_string());
}
}
requests.push(request);
}

Ok(PendingBenchmarkRequests {
requests,
completed_parent_tags,
})
}

async fn enqueue_benchmark_job(
Expand Down
Loading
Loading