diff --git a/src/admin/default_versions.rs b/src/admin/default_versions.rs index ccbc018d7a3..2b78b0e3554 100644 --- a/src/admin/default_versions.rs +++ b/src/admin/default_versions.rs @@ -1,4 +1,5 @@ use crate::models::{update_default_version, verify_default_version}; +use crate::tasks::spawn_blocking; use crate::{db, schema::crates}; use anyhow::Context; use diesel::prelude::*; @@ -14,29 +15,32 @@ pub enum Command { Verify, } -pub fn run(command: Command) -> anyhow::Result<()> { - let mut conn = db::oneoff_connection().context("Failed to connect to the database")?; +pub async fn run(command: Command) -> anyhow::Result<()> { + spawn_blocking(move || { + let mut conn = db::oneoff_connection().context("Failed to connect to the database")?; - let crate_ids: Vec = crates::table - .select(crates::id) - .load(&mut conn) - .context("Failed to load crates")?; + let crate_ids: Vec = crates::table + .select(crates::id) + .load(&mut conn) + .context("Failed to load crates")?; - let pb = ProgressBar::new(crate_ids.len() as u64); - pb.set_style(ProgressStyle::with_template( - "{bar:60} ({pos}/{len}, ETA {eta})", - )?); + let pb = ProgressBar::new(crate_ids.len() as u64); + pb.set_style(ProgressStyle::with_template( + "{bar:60} ({pos}/{len}, ETA {eta})", + )?); - for crate_id in crate_ids.into_iter().progress_with(pb.clone()) { - let func = match command { - Command::Update => update_default_version, - Command::Verify => verify_default_version, - }; + for crate_id in crate_ids.into_iter().progress_with(pb.clone()) { + let func = match command { + Command::Update => update_default_version, + Command::Verify => verify_default_version, + }; - if let Err(error) = func(crate_id, &mut conn) { - pb.suspend(|| warn!(%crate_id, %error, "Failed to update the default version")); + if let Err(error) = func(crate_id, &mut conn) { + pb.suspend(|| warn!(%crate_id, %error, "Failed to update the default version")); + } } - } - Ok(()) + Ok(()) + }) + .await } diff --git a/src/admin/delete_crate.rs b/src/admin/delete_crate.rs index e9d66dc62e5..617a291d8b1 100644 --- a/src/admin/delete_crate.rs +++ b/src/admin/delete_crate.rs @@ -1,4 +1,5 @@ use crate::schema::{crate_owners, teams, users}; +use crate::tasks::spawn_blocking; use crate::worker::jobs; use crate::{admin::dialoguer, db, schema::crates}; use anyhow::Context; @@ -24,13 +25,15 @@ pub struct Opts { yes: bool, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let conn = &mut db::oneoff_connection().context("Failed to establish database connection")?; +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let conn = + &mut db::oneoff_connection().context("Failed to establish database connection")?; - let mut crate_names = opts.crate_names; - crate_names.sort(); + let mut crate_names = opts.crate_names; + crate_names.sort(); - let query_result = crates::table + let query_result = crates::table .select(( crates::name, crates::id, @@ -45,53 +48,55 @@ pub fn run(opts: Opts) -> anyhow::Result<()> { .load::<(String, i32, String)>(conn) .context("Failed to look up crate name from the database")?; - let mut existing_crates: HashMap)> = HashMap::new(); - for (name, id, login) in query_result { - let entry = existing_crates - .entry(name) - .or_insert_with(|| (id, Vec::new())); + let mut existing_crates: HashMap)> = HashMap::new(); + for (name, id, login) in query_result { + let entry = existing_crates + .entry(name) + .or_insert_with(|| (id, Vec::new())); - entry.1.push(login); - } + entry.1.push(login); + } - println!("Deleting the following crates:"); - println!(); - for name in &crate_names { - match existing_crates.get(name) { - Some((id, owners)) => { - let owners = owners.join(", "); - println!(" - {name} (id={id}, owners={owners})"); + println!("Deleting the following crates:"); + println!(); + for name in &crate_names { + match existing_crates.get(name) { + Some((id, owners)) => { + let owners = owners.join(", "); + println!(" - {name} (id={id}, owners={owners})"); + } + None => println!(" - {name} (⚠️ crate not found)"), } - None => println!(" - {name} (⚠️ crate not found)"), } - } - println!(); + println!(); - if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these crates?") { - return Ok(()); - } + if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these crates?") { + return Ok(()); + } - for name in &crate_names { - if let Some((id, _)) = existing_crates.get(name) { - info!("{name}: Deleting crate from the database…"); - if let Err(error) = diesel::delete(crates::table.find(id)).execute(conn) { - warn!(%id, "{name}: Failed to delete crate from the database: {error}"); - } - } else { - info!("{name}: Skipped missing crate"); - }; + for name in &crate_names { + if let Some((id, _)) = existing_crates.get(name) { + info!("{name}: Deleting crate from the database…"); + if let Err(error) = diesel::delete(crates::table.find(id)).execute(conn) { + warn!(%id, "{name}: Failed to delete crate from the database: {error}"); + } + } else { + info!("{name}: Skipped missing crate"); + }; - info!("{name}: Enqueuing index sync jobs…"); - if let Err(error) = jobs::enqueue_sync_to_index(name, conn) { - warn!("{name}: Failed to enqueue index sync jobs: {error}"); - } + info!("{name}: Enqueuing index sync jobs…"); + if let Err(error) = jobs::enqueue_sync_to_index(name, conn) { + warn!("{name}: Failed to enqueue index sync jobs: {error}"); + } - info!("{name}: Enqueuing DeleteCrateFromStorage job…"); - let job = jobs::DeleteCrateFromStorage::new(name.into()); - if let Err(error) = job.enqueue(conn) { - warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}"); + info!("{name}: Enqueuing DeleteCrateFromStorage job…"); + let job = jobs::DeleteCrateFromStorage::new(name.into()); + if let Err(error) = job.enqueue(conn) { + warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}"); + } } - } - Ok(()) + Ok(()) + }) + .await } diff --git a/src/admin/delete_version.rs b/src/admin/delete_version.rs index 38530d963d3..8c3f2f26479 100644 --- a/src/admin/delete_version.rs +++ b/src/admin/delete_version.rs @@ -1,6 +1,7 @@ use crate::models::update_default_version; use crate::schema::crates; use crate::storage::Storage; +use crate::tasks::spawn_blocking; use crate::worker::jobs; use crate::{admin::dialoguer, db, schema::versions}; use anyhow::Context; @@ -25,86 +26,88 @@ pub struct Opts { yes: bool, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let crate_name = &opts.crate_name; - - let conn = &mut db::oneoff_connection().context("Failed to establish database connection")?; - - let store = Storage::from_environment(); - - let crate_id: i32 = crates::table - .select(crates::id) - .filter(crates::name.eq(crate_name)) - .first(conn) - .context("Failed to look up crate id from the database")?; - - println!("Deleting the following versions of the `{crate_name}` crate:"); - println!(); - for version in &opts.versions { - println!(" - {version}"); - } - println!(); - - if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these versions?") { - return Ok(()); - } - - conn.transaction(|conn| { - info!(%crate_name, %crate_id, versions = ?opts.versions, "Deleting versions from the database"); - let result = diesel::delete( - versions::table - .filter(versions::crate_id.eq(crate_id)) - .filter(versions::num.eq_any(&opts.versions)), - ) - .execute(conn); - - match result { - Ok(num_deleted) if num_deleted == opts.versions.len() => {} - Ok(num_deleted) => { - warn!( - %crate_name, - "Deleted only {num_deleted} of {num_expected} versions from the database", - num_expected = opts.versions.len() - ); - } - Err(error) => { - warn!(%crate_name, ?error, "Failed to delete versions from the database") - } +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let crate_name = &opts.crate_name; + + let conn = &mut db::oneoff_connection().context("Failed to establish database connection")?; + + let store = Storage::from_environment(); + + let crate_id: i32 = crates::table + .select(crates::id) + .filter(crates::name.eq(crate_name)) + .first(conn) + .context("Failed to look up crate id from the database")?; + + println!("Deleting the following versions of the `{crate_name}` crate:"); + println!(); + for version in &opts.versions { + println!(" - {version}"); } + println!(); - info!(%crate_name, %crate_id, "Updating default version in the database"); - if let Err(error) = update_default_version(crate_id, conn) { - warn!(%crate_name, %crate_id, ?error, "Failed to update default version"); + if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these versions?") { + return Ok(()); } - Ok::<_, anyhow::Error>(()) - })?; + conn.transaction(|conn| { + info!(%crate_name, %crate_id, versions = ?opts.versions, "Deleting versions from the database"); + let result = diesel::delete( + versions::table + .filter(versions::crate_id.eq(crate_id)) + .filter(versions::num.eq_any(&opts.versions)), + ) + .execute(conn); + + match result { + Ok(num_deleted) if num_deleted == opts.versions.len() => {} + Ok(num_deleted) => { + warn!( + %crate_name, + "Deleted only {num_deleted} of {num_expected} versions from the database", + num_expected = opts.versions.len() + ); + } + Err(error) => { + warn!(%crate_name, ?error, "Failed to delete versions from the database") + } + } - info!(%crate_name, "Enqueuing index sync jobs"); - if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) { - warn!(%crate_name, ?error, "Failed to enqueue index sync jobs"); - } + info!(%crate_name, %crate_id, "Updating default version in the database"); + if let Err(error) = update_default_version(crate_id, conn) { + warn!(%crate_name, %crate_id, ?error, "Failed to update default version"); + } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("Failed to initialize tokio runtime")?; + Ok::<_, anyhow::Error>(()) + })?; - for version in &opts.versions { - debug!(%crate_name, %version, "Deleting crate file from S3"); - if let Err(error) = rt.block_on(store.delete_crate_file(crate_name, version)) { - warn!(%crate_name, %version, ?error, "Failed to delete crate file from S3"); + info!(%crate_name, "Enqueuing index sync jobs"); + if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) { + warn!(%crate_name, ?error, "Failed to enqueue index sync jobs"); } - debug!(%crate_name, %version, "Deleting readme file from S3"); - match rt.block_on(store.delete_readme(crate_name, version)) { - Err(object_store::Error::NotFound { .. }) => {} - Err(error) => { - warn!(%crate_name, %version, ?error, "Failed to delete readme file from S3") + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("Failed to initialize tokio runtime")?; + + for version in &opts.versions { + debug!(%crate_name, %version, "Deleting crate file from S3"); + if let Err(error) = rt.block_on(store.delete_crate_file(crate_name, version)) { + warn!(%crate_name, %version, ?error, "Failed to delete crate file from S3"); + } + + debug!(%crate_name, %version, "Deleting readme file from S3"); + match rt.block_on(store.delete_readme(crate_name, version)) { + Err(object_store::Error::NotFound { .. }) => {} + Err(error) => { + warn!(%crate_name, %version, ?error, "Failed to delete readme file from S3") + } + Ok(_) => {} } - Ok(_) => {} } - } - Ok(()) + Ok(()) + }).await } diff --git a/src/admin/enqueue_job.rs b/src/admin/enqueue_job.rs index 4765166442e..1e9482e36ce 100644 --- a/src/admin/enqueue_job.rs +++ b/src/admin/enqueue_job.rs @@ -1,5 +1,6 @@ use crate::db; use crate::schema::{background_jobs, crates}; +use crate::tasks::spawn_blocking; use crate::worker::jobs; use anyhow::Result; use chrono::NaiveDate; @@ -50,106 +51,109 @@ pub enum Command { SyncUpdatesFeed, } -pub fn run(command: Command) -> Result<()> { - let conn = &mut db::oneoff_connection()?; - println!("Enqueueing background job: {command:?}"); +pub async fn run(command: Command) -> Result<()> { + spawn_blocking(move || { + let conn = &mut db::oneoff_connection()?; + println!("Enqueueing background job: {command:?}"); - match command { - Command::ArchiveVersionDownloads { before } => { - before - .map(jobs::ArchiveVersionDownloads::before) - .unwrap_or_default() - .enqueue(conn)?; - } - Command::IndexVersionDownloadsArchive => { - jobs::IndexVersionDownloadsArchive.enqueue(conn)?; - } - Command::UpdateDownloads => { - let count: i64 = background_jobs::table - .filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME)) - .count() - .get_result(conn)?; - - if count > 0 { - println!( - "Did not enqueue {}, existing job already in progress", - jobs::UpdateDownloads::JOB_NAME - ); - } else { - jobs::UpdateDownloads.enqueue(conn)?; - } - } - Command::CleanProcessedLogFiles => { - jobs::CleanProcessedLogFiles.enqueue(conn)?; - } - Command::DumpDb => { - jobs::DumpDb.enqueue(conn)?; - } - Command::SyncAdmins { force } => { - if !force { - // By default, we don't want to enqueue a sync if one is already - // in progress. If a sync fails due to e.g. an expired pinned - // certificate we don't want to keep adding new jobs to the - // queue, since the existing job will be retried until it - // succeeds. - - let query = background_jobs::table - .filter(background_jobs::job_type.eq(jobs::SyncAdmins::JOB_NAME)); + match command { + Command::ArchiveVersionDownloads { before } => { + before + .map(jobs::ArchiveVersionDownloads::before) + .unwrap_or_default() + .enqueue(conn)?; + } + Command::IndexVersionDownloadsArchive => { + jobs::IndexVersionDownloadsArchive.enqueue(conn)?; + } + Command::UpdateDownloads => { + let count: i64 = background_jobs::table + .filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME)) + .count() + .get_result(conn)?; - if diesel::select(exists(query)).get_result(conn)? { - info!( + if count > 0 { + println!( "Did not enqueue {}, existing job already in progress", - jobs::SyncAdmins::JOB_NAME + jobs::UpdateDownloads::JOB_NAME ); - return Ok(()); + } else { + jobs::UpdateDownloads.enqueue(conn)?; } } + Command::CleanProcessedLogFiles => { + jobs::CleanProcessedLogFiles.enqueue(conn)?; + } + Command::DumpDb => { + jobs::DumpDb.enqueue(conn)?; + } + Command::SyncAdmins { force } => { + if !force { + // By default, we don't want to enqueue a sync if one is already + // in progress. If a sync fails due to e.g. an expired pinned + // certificate we don't want to keep adding new jobs to the + // queue, since the existing job will be retried until it + // succeeds. - jobs::SyncAdmins.enqueue(conn)?; - } - Command::DailyDbMaintenance => { - jobs::DailyDbMaintenance.enqueue(conn)?; - } - Command::ProcessCdnLogQueue(job) => { - job.enqueue(conn)?; - } - Command::SquashIndex => { - jobs::SquashIndex.enqueue(conn)?; - } - Command::NormalizeIndex { dry_run } => { - jobs::NormalizeIndex::new(dry_run).enqueue(conn)?; - } - Command::CheckTyposquat { name } => { - // The job will fail if the crate doesn't actually exist, so let's check that up front. - if crates::table - .filter(crates::name.eq(&name)) - .count() - .get_result::(conn)? - == 0 - { - anyhow::bail!( - "cannot enqueue a typosquat check for a crate that doesn't exist: {name}" - ); + let query = background_jobs::table + .filter(background_jobs::job_type.eq(jobs::SyncAdmins::JOB_NAME)); + + if diesel::select(exists(query)).get_result(conn)? { + info!( + "Did not enqueue {}, existing job already in progress", + jobs::SyncAdmins::JOB_NAME + ); + return Ok(()); + } + } + + jobs::SyncAdmins.enqueue(conn)?; + } + Command::DailyDbMaintenance => { + jobs::DailyDbMaintenance.enqueue(conn)?; + } + Command::ProcessCdnLogQueue(job) => { + job.enqueue(conn)?; } + Command::SquashIndex => { + jobs::SquashIndex.enqueue(conn)?; + } + Command::NormalizeIndex { dry_run } => { + jobs::NormalizeIndex::new(dry_run).enqueue(conn)?; + } + Command::CheckTyposquat { name } => { + // The job will fail if the crate doesn't actually exist, so let's check that up front. + if crates::table + .filter(crates::name.eq(&name)) + .count() + .get_result::(conn)? + == 0 + { + anyhow::bail!( + "cannot enqueue a typosquat check for a crate that doesn't exist: {name}" + ); + } - jobs::CheckTyposquat::new(&name).enqueue(conn)?; - } - Command::SendTokenExpiryNotifications => { - jobs::SendTokenExpiryNotifications.enqueue(conn)?; - } - Command::SyncCratesFeed => { - jobs::rss::SyncCratesFeed.enqueue(conn)?; - } - Command::SyncToGitIndex { name } => { - jobs::SyncToGitIndex::new(name).enqueue(conn)?; - } - Command::SyncToSparseIndex { name } => { - jobs::SyncToSparseIndex::new(name).enqueue(conn)?; - } - Command::SyncUpdatesFeed => { - jobs::rss::SyncUpdatesFeed.enqueue(conn)?; - } - }; + jobs::CheckTyposquat::new(&name).enqueue(conn)?; + } + Command::SendTokenExpiryNotifications => { + jobs::SendTokenExpiryNotifications.enqueue(conn)?; + } + Command::SyncCratesFeed => { + jobs::rss::SyncCratesFeed.enqueue(conn)?; + } + Command::SyncToGitIndex { name } => { + jobs::SyncToGitIndex::new(name).enqueue(conn)?; + } + Command::SyncToSparseIndex { name } => { + jobs::SyncToSparseIndex::new(name).enqueue(conn)?; + } + Command::SyncUpdatesFeed => { + jobs::rss::SyncUpdatesFeed.enqueue(conn)?; + } + }; - Ok(()) + Ok(()) + }) + .await } diff --git a/src/admin/migrate.rs b/src/admin/migrate.rs index 9124ec81bbe..5551f0d9706 100644 --- a/src/admin/migrate.rs +++ b/src/admin/migrate.rs @@ -1,3 +1,4 @@ +use crate::tasks::spawn_blocking; use anyhow::{anyhow, Error}; use diesel_migrations::{ embed_migrations, EmbeddedMigrations, HarnessWithOutput, MigrationHarness, @@ -14,39 +15,42 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); )] pub struct Opts; -pub fn run(_opts: Opts) -> Result<(), Error> { - let config = crate::config::DatabasePools::full_from_environment( - &crate::config::Base::from_environment()?, - )?; +pub async fn run(_opts: Opts) -> Result<(), Error> { + spawn_blocking(move || { + let config = crate::config::DatabasePools::full_from_environment( + &crate::config::Base::from_environment()?, + )?; - // TODO: Refactor logic so that we can also check things from App::new() here. - // If the app will panic due to bad configuration, it is better to error in the release phase - // to avoid launching dynos that will fail. + // TODO: Refactor logic so that we can also check things from App::new() here. + // If the app will panic due to bad configuration, it is better to error in the release phase + // to avoid launching dynos that will fail. - if config.are_all_read_only() { - // TODO: Check `any_pending_migrations()` with a read-only connection and error if true. - // It looks like this requires changes upstream to make this pub in `migration_macros`. + if config.are_all_read_only() { + // TODO: Check `any_pending_migrations()` with a read-only connection and error if true. + // It looks like this requires changes upstream to make this pub in `migration_macros`. - warn!("Skipping migrations and category sync (read-only mode)"); + warn!("Skipping migrations and category sync (read-only mode)"); - // The service is undergoing maintenance or mitigating an outage. - // Exit with success to ensure configuration changes can be made. - // Heroku will not launch new dynos if the release phase fails. - return Ok(()); - } + // The service is undergoing maintenance or mitigating an outage. + // Exit with success to ensure configuration changes can be made. + // Heroku will not launch new dynos if the release phase fails. + return Ok(()); + } - // The primary is online, access directly via `DATABASE_URL`. - let conn = &mut crate::db::oneoff_connection_with_config(&config)?; + // The primary is online, access directly via `DATABASE_URL`. + let conn = &mut crate::db::oneoff_connection_with_config(&config)?; - info!("Migrating the database"); - let mut stdout = std::io::stdout(); - let mut harness = HarnessWithOutput::new(conn, &mut stdout); - harness - .run_pending_migrations(MIGRATIONS) - .map_err(|err| anyhow!("Failed to run migrations: {err}"))?; + info!("Migrating the database"); + let mut stdout = std::io::stdout(); + let mut harness = HarnessWithOutput::new(conn, &mut stdout); + harness + .run_pending_migrations(MIGRATIONS) + .map_err(|err| anyhow!("Failed to run migrations: {err}"))?; - info!("Synchronizing crate categories"); - crate::boot::categories::sync_with_connection(CATEGORIES_TOML, conn)?; + info!("Synchronizing crate categories"); + crate::boot::categories::sync_with_connection(CATEGORIES_TOML, conn)?; - Ok(()) + Ok(()) + }) + .await } diff --git a/src/admin/populate.rs b/src/admin/populate.rs index c6c213fe405..fa81b75bd37 100644 --- a/src/admin/populate.rs +++ b/src/admin/populate.rs @@ -1,5 +1,6 @@ use crate::{db, schema::version_downloads}; +use crate::tasks::spawn_blocking; use diesel::prelude::*; use rand::{thread_rng, Rng}; @@ -13,10 +14,13 @@ pub struct Opts { version_ids: Vec, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let mut conn = db::oneoff_connection()?; - conn.transaction(|conn| update(opts, conn))?; - Ok(()) +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let mut conn = db::oneoff_connection()?; + conn.transaction(|conn| update(opts, conn))?; + Ok(()) + }) + .await } fn update(opts: Opts, conn: &mut PgConnection) -> QueryResult<()> { diff --git a/src/admin/render_readmes.rs b/src/admin/render_readmes.rs index 2af22f7823f..72f2023a930 100644 --- a/src/admin/render_readmes.rs +++ b/src/admin/render_readmes.rs @@ -8,6 +8,7 @@ use std::path::PathBuf; use std::{io::Read, path::Path, sync::Arc, thread}; use crate::storage::Storage; +use crate::tasks::spawn_blocking; use chrono::{NaiveDateTime, Utc}; use crates_io_markdown::text_to_html; use crates_io_tarball::{Manifest, StringOrBool}; @@ -40,102 +41,109 @@ pub struct Opts { crate_name: Option, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let storage = Arc::new(Storage::from_environment()); - let conn = &mut db::oneoff_connection()?; +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let storage = Arc::new(Storage::from_environment()); + let conn = &mut db::oneoff_connection()?; - let start_time = Utc::now(); + let start_time = Utc::now(); - let older_than = if let Some(ref time) = opts.older_than { - NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S") - .context("Could not parse --older-than argument as a time")? - } else { - start_time.naive_utc() - }; + let older_than = if let Some(ref time) = opts.older_than { + NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S") + .context("Could not parse --older-than argument as a time")? + } else { + start_time.naive_utc() + }; - println!("Start time: {start_time}"); - println!("Rendering readmes older than: {older_than}"); - - let mut query = versions::table - .inner_join(crates::table) - .left_outer_join(readme_renderings::table) - .filter( - readme_renderings::rendered_at - .lt(older_than) - .or(readme_renderings::version_id.is_null()), - ) - .select(versions::id) - .into_boxed(); - - if let Some(crate_name) = opts.crate_name { - println!("Rendering readmes for {crate_name}"); - query = query.filter(crates::name.eq(crate_name)); - } + println!("Start time: {start_time}"); + println!("Rendering readmes older than: {older_than}"); - let version_ids: Vec = query.load(conn).context("error loading version ids")?; + let mut query = versions::table + .inner_join(crates::table) + .left_outer_join(readme_renderings::table) + .filter( + readme_renderings::rendered_at + .lt(older_than) + .or(readme_renderings::version_id.is_null()), + ) + .select(versions::id) + .into_boxed(); - let total_versions = version_ids.len(); - println!("Rendering {total_versions} versions"); + if let Some(crate_name) = opts.crate_name { + println!("Rendering readmes for {crate_name}"); + query = query.filter(crates::name.eq(crate_name)); + } - let page_size = opts.page_size; + let version_ids: Vec = query.load(conn).context("error loading version ids")?; - let total_pages = total_versions / page_size; - let total_pages = if total_versions % page_size == 0 { - total_pages - } else { - total_pages + 1 - }; + let total_versions = version_ids.len(); + println!("Rendering {total_versions} versions"); - let client = Client::new(); + let page_size = opts.page_size; - for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() { - println!( - "= Page {} of {} ==================================", - page_num + 1, + let total_pages = total_versions / page_size; + let total_pages = if total_versions % page_size == 0 { total_pages - ); + } else { + total_pages + 1 + }; - let versions: Vec<(Version, String)> = versions::table - .inner_join(crates::table) - .filter(versions::id.eq_any(version_ids_chunk)) - .select((versions::all_columns, crates::name)) - .load(conn) - .context("error loading versions")?; - - let mut tasks = Vec::with_capacity(page_size); - for (version, krate_name) in versions { - Version::record_readme_rendering(version.id, conn) - .context("Couldn't record rendering time")?; - - let client = client.clone(); - let storage = storage.clone(); - let handle = thread::spawn::<_, anyhow::Result<()>>(move || { - println!("[{}-{}] Rendering README...", krate_name, version.num); - let readme = get_readme(&storage, &client, &version, &krate_name)?; - if !readme.is_empty() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("Failed to initialize tokio runtime")?; - - rt.block_on(storage.upload_readme(&krate_name, &version.num, readme.into())) + let client = Client::new(); + + for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() { + println!( + "= Page {} of {} ==================================", + page_num + 1, + total_pages + ); + + let versions: Vec<(Version, String)> = versions::table + .inner_join(crates::table) + .filter(versions::id.eq_any(version_ids_chunk)) + .select((versions::all_columns, crates::name)) + .load(conn) + .context("error loading versions")?; + + let mut tasks = Vec::with_capacity(page_size); + for (version, krate_name) in versions { + Version::record_readme_rendering(version.id, conn) + .context("Couldn't record rendering time")?; + + let client = client.clone(); + let storage = storage.clone(); + let handle = thread::spawn::<_, anyhow::Result<()>>(move || { + println!("[{}-{}] Rendering README...", krate_name, version.num); + let readme = get_readme(&storage, &client, &version, &krate_name)?; + if !readme.is_empty() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("Failed to initialize tokio runtime")?; + + rt.block_on(storage.upload_readme( + &krate_name, + &version.num, + readme.into(), + )) .context("Failed to upload rendered README file to S3")?; - } + } - Ok(()) - }); - tasks.push(handle); - } - for handle in tasks { - match handle.join() { - Err(err) => println!("Thread panicked: {err:?}"), - Ok(Err(err)) => println!("Thread failed: {err:?}"), - _ => {} + Ok(()) + }); + tasks.push(handle); + } + for handle in tasks { + match handle.join() { + Err(err) => println!("Thread panicked: {err:?}"), + Ok(Err(err)) => println!("Thread failed: {err:?}"), + _ => {} + } } } - } - Ok(()) + Ok(()) + }) + .await } /// Renders the readme of an uploaded crate version. diff --git a/src/admin/test_pagerduty.rs b/src/admin/test_pagerduty.rs index 9d764f1235b..ce3d5a098d8 100644 --- a/src/admin/test_pagerduty.rs +++ b/src/admin/test_pagerduty.rs @@ -2,6 +2,7 @@ use anyhow::Result; use std::str::FromStr; use crate::admin::on_call; +use crate::tasks::spawn_blocking; #[derive(Debug, Copy, Clone, clap::ValueEnum)] pub enum EventType { @@ -31,20 +32,23 @@ pub struct Opts { description: Option, } -pub fn run(opts: Opts) -> Result<()> { - let event = match opts.event_type { - EventType::Trigger => on_call::Event::Trigger { - incident_key: Some("test".into()), - description: opts.description.unwrap_or_else(|| "Test event".into()), - }, - EventType::Acknowledge => on_call::Event::Acknowledge { - incident_key: "test".into(), - description: opts.description, - }, - EventType::Resolve => on_call::Event::Resolve { - incident_key: "test".into(), - description: opts.description, - }, - }; - event.send() +pub async fn run(opts: Opts) -> Result<()> { + spawn_blocking(move || { + let event = match opts.event_type { + EventType::Trigger => on_call::Event::Trigger { + incident_key: Some("test".into()), + description: opts.description.unwrap_or_else(|| "Test event".into()), + }, + EventType::Acknowledge => on_call::Event::Acknowledge { + incident_key: "test".into(), + description: opts.description, + }, + EventType::Resolve => on_call::Event::Resolve { + incident_key: "test".into(), + description: opts.description, + }, + }; + event.send() + }) + .await } diff --git a/src/admin/transfer_crates.rs b/src/admin/transfer_crates.rs index 2abb94f8d48..3f6ff12d32b 100644 --- a/src/admin/transfer_crates.rs +++ b/src/admin/transfer_crates.rs @@ -6,6 +6,7 @@ use crate::{ }; use std::process::exit; +use crate::tasks::spawn_blocking; use diesel::prelude::*; #[derive(clap::Parser, Debug)] @@ -20,10 +21,13 @@ pub struct Opts { to_user: String, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let conn = &mut db::oneoff_connection()?; - conn.transaction(|conn| transfer(opts, conn))?; - Ok(()) +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let conn = &mut db::oneoff_connection()?; + conn.transaction(|conn| transfer(opts, conn))?; + Ok(()) + }) + .await } fn transfer(opts: Opts, conn: &mut PgConnection) -> anyhow::Result<()> { diff --git a/src/admin/upload_index.rs b/src/admin/upload_index.rs index d29dd62ce92..6a8afe61f9a 100644 --- a/src/admin/upload_index.rs +++ b/src/admin/upload_index.rs @@ -1,5 +1,6 @@ use crate::admin::dialoguer; use crate::storage::Storage; +use crate::tasks::spawn_blocking; use anyhow::{anyhow, Context}; use crates_io_index::{Repository, RepositoryConfig}; use indicatif::{ProgressBar, ProgressIterator, ProgressStyle}; @@ -14,55 +15,58 @@ pub struct Opts { incremental_commit: Option, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let storage = Storage::from_environment(); +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let storage = Storage::from_environment(); - println!("fetching git repo"); - let config = RepositoryConfig::from_environment()?; - let repo = Repository::open(&config)?; - repo.reset_head()?; - println!("HEAD is at {}", repo.head_oid()?); + println!("fetching git repo"); + let config = RepositoryConfig::from_environment()?; + let repo = Repository::open(&config)?; + repo.reset_head()?; + println!("HEAD is at {}", repo.head_oid()?); - let files = repo.get_files_modified_since(opts.incremental_commit.as_deref())?; - println!("found {} files to upload", files.len()); - if !dialoguer::confirm("continue with upload?") { - return Ok(()); - } + let files = repo.get_files_modified_since(opts.incremental_commit.as_deref())?; + println!("found {} files to upload", files.len()); + if !dialoguer::confirm("continue with upload?") { + return Ok(()); + } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("Failed to initialize tokio runtime")?; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("Failed to initialize tokio runtime")?; - let pb = ProgressBar::new(files.len() as u64); - pb.set_style(ProgressStyle::with_template( - "{bar:60} ({pos}/{len}, ETA {eta})", - )?); + let pb = ProgressBar::new(files.len() as u64); + pb.set_style(ProgressStyle::with_template( + "{bar:60} ({pos}/{len}, ETA {eta})", + )?); - for file in files.iter().progress_with(pb.clone()) { - let file_name = file.file_name().ok_or_else(|| { - let file = file.display(); - anyhow!("Failed to get file name from path: {file}") - })?; + for file in files.iter().progress_with(pb.clone()) { + let file_name = file.file_name().ok_or_else(|| { + let file = file.display(); + anyhow!("Failed to get file name from path: {file}") + })?; - let crate_name = file_name.to_str().ok_or_else(|| { - let file_name = file_name.to_string_lossy(); - anyhow!("Failed to convert file name to utf8: {file_name}",) - })?; + let crate_name = file_name.to_str().ok_or_else(|| { + let file_name = file_name.to_string_lossy(); + anyhow!("Failed to convert file name to utf8: {file_name}",) + })?; - let path = repo.index_file(crate_name); - if !path.exists() { - pb.suspend(|| println!("skipping file `{crate_name}`")); - continue; - } + let path = repo.index_file(crate_name); + if !path.exists() { + pb.suspend(|| println!("skipping file `{crate_name}`")); + continue; + } - let contents = std::fs::read_to_string(&path)?; - rt.block_on(storage.sync_index(crate_name, Some(contents)))?; - } + let contents = std::fs::read_to_string(&path)?; + rt.block_on(storage.sync_index(crate_name, Some(contents)))?; + } - println!( - "uploading completed; use `upload-index {}` for an incremental run", - repo.head_oid()? - ); - Ok(()) + println!( + "uploading completed; use `upload-index {}` for an incremental run", + repo.head_oid()? + ); + Ok(()) + }) + .await } diff --git a/src/admin/verify_token.rs b/src/admin/verify_token.rs index 329ab496f56..7ff237b8070 100644 --- a/src/admin/verify_token.rs +++ b/src/admin/verify_token.rs @@ -1,4 +1,5 @@ use crate::models::ApiToken; +use crate::tasks::spawn_blocking; use crate::util::token::HashedToken; use crate::{db, models::User}; @@ -14,11 +15,14 @@ pub struct Opts { api_token: String, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let conn = &mut db::oneoff_connection()?; - let token = HashedToken::parse(&opts.api_token)?; - let token = ApiToken::find_by_api_token(conn, &token)?; - let user = User::find(conn, token.user_id)?; - println!("The token belongs to user {}", user.gh_login); - Ok(()) +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let conn = &mut db::oneoff_connection()?; + let token = HashedToken::parse(&opts.api_token)?; + let token = ApiToken::find_by_api_token(conn, &token)?; + let user = User::find(conn, token.user_id)?; + println!("The token belongs to user {}", user.gh_login); + Ok(()) + }) + .await } diff --git a/src/admin/yank_version.rs b/src/admin/yank_version.rs index 4b050633550..b9c726213cf 100644 --- a/src/admin/yank_version.rs +++ b/src/admin/yank_version.rs @@ -2,6 +2,7 @@ use crate::admin::dialoguer; use crate::db; use crate::models::{Crate, Version}; use crate::schema::versions; +use crate::tasks::spawn_blocking; use crate::worker::jobs; use crate::worker::jobs::UpdateDefaultVersion; use crates_io_worker::BackgroundJob; @@ -22,10 +23,13 @@ pub struct Opts { yes: bool, } -pub fn run(opts: Opts) -> anyhow::Result<()> { - let mut conn = db::oneoff_connection()?; - conn.transaction(|conn| yank(opts, conn))?; - Ok(()) +pub async fn run(opts: Opts) -> anyhow::Result<()> { + spawn_blocking(move || { + let mut conn = db::oneoff_connection()?; + conn.transaction(|conn| yank(opts, conn))?; + Ok(()) + }) + .await } fn yank(opts: Opts, conn: &mut PgConnection) -> anyhow::Result<()> { diff --git a/src/bin/crates-admin.rs b/src/bin/crates-admin.rs index d158149601f..06eb46aa88c 100644 --- a/src/bin/crates-admin.rs +++ b/src/bin/crates-admin.rs @@ -25,7 +25,8 @@ enum Command { DefaultVersions(default_versions::Command), } -fn main() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { let _sentry = crates_io::sentry::init(); // Initialize logging @@ -38,18 +39,18 @@ fn main() -> anyhow::Result<()> { span.record("command", tracing::field::debug(&command)); match command { - Command::DeleteCrate(opts) => delete_crate::run(opts), - Command::DeleteVersion(opts) => delete_version::run(opts), - Command::Populate(opts) => populate::run(opts), - Command::RenderReadmes(opts) => render_readmes::run(opts), - Command::TestPagerduty(opts) => test_pagerduty::run(opts), - Command::TransferCrates(opts) => transfer_crates::run(opts), - Command::VerifyToken(opts) => verify_token::run(opts), - Command::Migrate(opts) => migrate::run(opts), - Command::UploadIndex(opts) => upload_index::run(opts), - Command::YankVersion(opts) => yank_version::run(opts), - Command::EnqueueJob(command) => enqueue_job::run(command), - Command::DefaultVersions(opts) => default_versions::run(opts), + Command::DeleteCrate(opts) => delete_crate::run(opts).await, + Command::DeleteVersion(opts) => delete_version::run(opts).await, + Command::Populate(opts) => populate::run(opts).await, + Command::RenderReadmes(opts) => render_readmes::run(opts).await, + Command::TestPagerduty(opts) => test_pagerduty::run(opts).await, + Command::TransferCrates(opts) => transfer_crates::run(opts).await, + Command::VerifyToken(opts) => verify_token::run(opts).await, + Command::Migrate(opts) => migrate::run(opts).await, + Command::UploadIndex(opts) => upload_index::run(opts).await, + Command::YankVersion(opts) => yank_version::run(opts).await, + Command::EnqueueJob(command) => enqueue_job::run(command).await, + Command::DefaultVersions(opts) => default_versions::run(opts).await, } }