Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion site/src/github/comparison_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn post_finished(ctxt: &SiteCtxt) {
/// Posts a comment to GitHub summarizing the comparison of the queued commit with its parent
///
/// `is_master_commit` is used to differentiate messages for try runs and post-merge runs.
async fn post_comparison_comment(
pub async fn post_comparison_comment(
ctxt: &SiteCtxt,
commit: QueuedCommit,
is_master_commit: bool,
Expand Down
106 changes: 90 additions & 16 deletions site/src/job_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
mod utils;

use crate::github::comparison_summary::post_comparison_comment;
use crate::job_queue::utils::{parse_release_string, ExtractIf};
use crate::load::{partition_in_place, SiteCtxt};
use chrono::Utc;
use collector::benchmark_set::benchmark_set_count;
use database::{
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, PendingBenchmarkRequests,
Target,
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, Date,
PendingBenchmarkRequests, QueuedCommit, Target,
};
use parking_lot::RwLock;
use std::{str::FromStr, sync::Arc};
use tokio::time::{self, Duration};
use tokio::time::{self, Duration, MissedTickBehavior};

pub fn run_new_queue() -> bool {
std::env::var("RUN_CRON")
Expand All @@ -20,12 +21,13 @@ pub fn run_new_queue() -> bool {
}

/// Store the latest master commits or do nothing if all of them are
/// already in the database
/// already in the database.
/// Returns `true` if at least one benchmark request was inserted.
async fn create_benchmark_request_master_commits(
ctxt: &SiteCtxt,
conn: &dyn database::pool::Connection,
index: &BenchmarkRequestIndex,
) -> anyhow::Result<()> {
) -> anyhow::Result<bool> {
let now = Utc::now();

let master_commits = ctxt.get_master_commits();
Expand All @@ -38,6 +40,7 @@ async fn create_benchmark_request_master_commits(
// TODO; delete at some point in the future
let cutoff: chrono::DateTime<Utc> = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?;

let mut inserted = false;
for master_commit in master_commits {
// We don't want to add masses of obsolete data
if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) {
Expand All @@ -51,18 +54,21 @@ async fn create_benchmark_request_master_commits(
log::info!("Inserting master benchmark request {benchmark:?}");
if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
log::error!("Failed to insert master benchmark request: {error:?}");
} else {
inserted = true;
}
}
}
Ok(())
Ok(inserted)
}

/// Store the latest release commits or do nothing if all of them are
/// already in the database
/// Returns `true` if at least one benchmark request was inserted.
async fn create_benchmark_request_releases(
conn: &dyn database::pool::Connection,
index: &BenchmarkRequestIndex,
) -> anyhow::Result<()> {
) -> anyhow::Result<bool> {
let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt")
.await?
.text()
Expand All @@ -76,16 +82,19 @@ async fn create_benchmark_request_releases(
.filter_map(parse_release_string)
.take(20);

let mut inserted = false;
for (name, commit_date) in releases {
if commit_date >= cutoff && !index.contains_tag(&name) {
let release_request = BenchmarkRequest::create_release(&name, commit_date);
log::info!("Inserting release benchmark request {release_request:?}");
if let Err(error) = conn.insert_benchmark_request(&release_request).await {
log::error!("Failed to insert release benchmark request: {error}");
} else {
inserted = true;
}
}
}
Ok(())
Ok(inserted)
}

/// Sorts try and master requests that are in the `ArtifactsReady` status and return them in the
Expand Down Expand Up @@ -254,16 +263,20 @@ pub async fn enqueue_benchmark_request(
/// If there is a request that has artifacts ready, and nothing is currently in-progress,
/// it will be enqueued.
/// If there is a request whose jobs have all completed, it will be marked as completed.
///
/// Returns benchmark requests that were completed.
async fn process_benchmark_requests(
conn: &mut dyn database::pool::Connection,
) -> anyhow::Result<()> {
) -> anyhow::Result<Vec<BenchmarkRequest>> {
let queue = build_queue(conn).await?;

let mut completed = vec![];
for request in queue {
match request.status() {
BenchmarkRequestStatus::InProgress => {
let tag = request.tag().expect("In progress request without a tag");
if conn.maybe_mark_benchmark_request_as_completed(tag).await? {
completed.push(request);
continue;
}
break;
Expand All @@ -278,28 +291,89 @@ async fn process_benchmark_requests(
}
}
}
Ok(())
Ok(completed)
}

/// For queueing jobs, add the jobs you want to queue to this function
async fn cron_enqueue_jobs(site_ctxt: &SiteCtxt) -> anyhow::Result<()> {
let mut conn = site_ctxt.conn().await;
async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
let mut conn = ctxt.conn().await;

let index = conn.load_benchmark_request_index().await?;
let index = ctxt.known_benchmark_requests.load();

let mut requests_inserted = false;
// Put the master commits into the `benchmark_requests` queue
create_benchmark_request_master_commits(site_ctxt, &*conn, &index).await?;
requests_inserted |= create_benchmark_request_master_commits(ctxt, &*conn, &index).await?;
// Put the releases into the `benchmark_requests` queue
create_benchmark_request_releases(&*conn, &index).await?;
requests_inserted |= create_benchmark_request_releases(&*conn, &index).await?;
// Enqueue waiting requests and try to complete in-progress ones
process_benchmark_requests(&mut *conn).await?;
let completed_reqs = process_benchmark_requests(&mut *conn).await?;

// If some change happened, reload the benchmark request index
if requests_inserted {
ctxt.known_benchmark_requests
.store(Arc::new(conn.load_benchmark_request_index().await?));
}

// Send a comment to GitHub for completed requests and reload the DB index
if !completed_reqs.is_empty() {
let index = database::Index::load(&mut *conn).await;
log::info!("index has {} commits", index.commits().len());
ctxt.index.store(Arc::new(index));

// Refresh the landing page
ctxt.landing_page.store(Arc::new(None));

// Send comments to GitHub
for request in completed_reqs {
let (is_master, pr, sha, parent_sha) = match request.commit_type() {
BenchmarkRequestType::Try {
pr,
parent_sha,
sha,
} => (
false,
*pr,
sha.clone().expect("Completed try commit without a SHA"),
parent_sha
.clone()
.expect("Completed try commit without a parent SHA"),
),
BenchmarkRequestType::Master {
pr,
sha,
parent_sha,
} => (true, *pr, sha.clone(), parent_sha.clone()),
BenchmarkRequestType::Release { .. } => continue,
};
let commit = QueuedCommit {
pr,
sha,
parent_sha,
include: None,
exclude: None,
runs: None,
commit_date: request.commit_date().map(Date),
backends: Some(
request
.backends()?
.into_iter()
.map(|b| b.as_str())
.collect::<Vec<_>>()
.join(","),
),
};
post_comparison_comment(ctxt, commit, is_master).await?;
}
}

Ok(())
}

/// Entry point for the cron job that manages the benchmark request and job queue.
pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interval: Duration) {
let mut interval = time::interval(run_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let ctxt = site_ctxt.clone();

loop {
Expand Down
6 changes: 5 additions & 1 deletion site/src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use serde::{Deserialize, Serialize};
use crate::self_profile::SelfProfileCache;
use collector::compile::benchmark::category::Category;
use collector::{Bound, MasterCommit};
use database::Pool;
pub use database::{ArtifactId, Benchmark, Commit};
use database::{BenchmarkRequestIndex, Pool};
use database::{CommitType, Date};

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -128,6 +128,8 @@ pub struct SiteCtxt {
pub landing_page: ArcSwap<Option<Arc<crate::api::graphs::Response>>>,
/// Index of various common queries
pub index: ArcSwap<database::Index>,
/// Index of all benchmark requests that we have in the DB
pub known_benchmark_requests: ArcSwap<BenchmarkRequestIndex>,
/// Cached master-branch Rust commits
pub master_commits: Arc<ArcSwap<MasterCommitCache>>, // outer Arc enables mutation in background task
/// Cache for self profile data
Expand Down Expand Up @@ -160,6 +162,7 @@ impl SiteCtxt {

let mut conn = pool.connection().await;
let index = database::Index::load(&mut *conn).await;
let benchmark_request_index = conn.load_benchmark_request_index().await?;

let config = if let Ok(s) = fs::read_to_string("site-config.toml") {
toml::from_str(&s)?
Expand All @@ -177,6 +180,7 @@ impl SiteCtxt {
Ok(Self {
config,
index: ArcSwap::new(Arc::new(index)),
known_benchmark_requests: ArcSwap::new(Arc::new(benchmark_request_index)),
master_commits: Arc::new(ArcSwap::new(Arc::new(master_commits))),
pool,
landing_page: ArcSwap::new(Arc::new(None)),
Expand Down
Loading