diff --git a/crates/crates_io_worker/src/background_job.rs b/crates/crates_io_worker/src/background_job.rs index 1f5be41ae1..60d187e2dc 100644 --- a/crates/crates_io_worker/src/background_job.rs +++ b/crates/crates_io_worker/src/background_job.rs @@ -4,6 +4,8 @@ use diesel::dsl::{exists, not}; use diesel::sql_types::{Int2, Jsonb, Text}; use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl}; use diesel_async::{AsyncPgConnection, RunQueryDsl}; +use futures_util::future::BoxFuture; +use futures_util::FutureExt; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Value; @@ -38,32 +40,37 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static { /// Execute the task. This method should define its logic. fn run(&self, ctx: Self::Context) -> impl Future> + Send; - #[allow(async_fn_in_trait)] #[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))] - async fn enqueue(&self, conn: &mut AsyncPgConnection) -> Result, EnqueueError> { - let data = serde_json::to_value(self)?; + fn enqueue( + &self, + conn: &mut AsyncPgConnection, + ) -> BoxFuture<'_, Result, EnqueueError>> { + let data = match serde_json::to_value(self) { + Ok(data) => data, + Err(err) => return async move { Err(EnqueueError::SerializationError(err)) }.boxed(), + }; let priority = Self::PRIORITY; if Self::DEDUPLICATED { - Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?) + let future = enqueue_deduplicated(conn, Self::JOB_NAME, data, priority); + future.boxed() } else { - Ok(Some( - enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?, - )) + let future = enqueue_simple(conn, Self::JOB_NAME, data, priority); + async move { Ok(Some(future.await?)) }.boxed() } } } -async fn enqueue_deduplicated( +fn enqueue_deduplicated( conn: &mut AsyncPgConnection, - job_type: &str, - data: &Value, + job_type: &'static str, + data: Value, priority: i16, -) -> Result, EnqueueError> { +) -> impl Future, EnqueueError>> { let similar_jobs = background_jobs::table .select(background_jobs::id) .filter(background_jobs::job_type.eq(job_type)) - .filter(background_jobs::data.eq(data)) + .filter(background_jobs::data.eq(data.clone())) .filter(background_jobs::priority.eq(priority)) .for_update() .skip_locked(); @@ -75,7 +82,7 @@ async fn enqueue_deduplicated( )) .filter(not(exists(similar_jobs))); - let id = diesel::insert_into(background_jobs::table) + let future = diesel::insert_into(background_jobs::table) .values(deduplicated_select) .into_columns(( background_jobs::job_type, @@ -83,28 +90,25 @@ async fn enqueue_deduplicated( background_jobs::priority, )) .returning(background_jobs::id) - .get_result::(conn) - .await - .optional()?; + .get_result::(conn); - Ok(id) + async move { Ok(future.await.optional()?) } } -async fn enqueue_simple( +fn enqueue_simple( conn: &mut AsyncPgConnection, - job_type: &str, - data: &Value, + job_type: &'static str, + data: Value, priority: i16, -) -> Result { - let id = diesel::insert_into(background_jobs::table) +) -> impl Future> { + let future = diesel::insert_into(background_jobs::table) .values(( background_jobs::job_type.eq(job_type), background_jobs::data.eq(data), background_jobs::priority.eq(priority), )) .returning(background_jobs::id) - .get_result(conn) - .await?; + .get_result(conn); - Ok(id) + async move { Ok(future.await?) } } diff --git a/src/bin/crates-admin/delete_crate.rs b/src/bin/crates-admin/delete_crate.rs index 0cc1ec59e8..971e1a3ea3 100644 --- a/src/bin/crates-admin/delete_crate.rs +++ b/src/bin/crates-admin/delete_crate.rs @@ -107,21 +107,17 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { info!("{name}: Skipped missing crate"); }; - info!("{name}: Enqueuing index sync jobs…"); - let job = jobs::SyncToGitIndex::new(name); - if let Err(error) = job.enqueue(&mut conn).await { - warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}"); - } - - let job = jobs::SyncToSparseIndex::new(name); - if let Err(error) = job.enqueue(&mut conn).await { - warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}"); - } - - info!("{name}: Enqueuing DeleteCrateFromStorage job…"); - let job = jobs::DeleteCrateFromStorage::new(name.into()); - if let Err(error) = job.enqueue(&mut conn).await { - warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}"); + info!("{name}: Enqueuing background jobs…"); + let git_index_job = jobs::SyncToGitIndex::new(name); + let sparse_index_job = jobs::SyncToSparseIndex::new(name); + let delete_from_storage_job = jobs::DeleteCrateFromStorage::new(name.into()); + + if let Err(error) = tokio::try_join!( + git_index_job.enqueue(&mut conn), + sparse_index_job.enqueue(&mut conn), + delete_from_storage_job.enqueue(&mut conn), + ) { + warn!("{name}: Failed to enqueue background job: {error}"); } } diff --git a/src/bin/crates-admin/delete_version.rs b/src/bin/crates-admin/delete_version.rs index 0df8613311..63c7dff316 100644 --- a/src/bin/crates-admin/delete_version.rs +++ b/src/bin/crates-admin/delete_version.rs @@ -95,13 +95,14 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { let crate_name = &opts.crate_name; info!(%crate_name, "Enqueuing index sync jobs"); - let job = jobs::SyncToGitIndex::new(crate_name); - if let Err(error) = job.enqueue(&mut conn).await { - warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job"); - } - let job = jobs::SyncToSparseIndex::new(crate_name); - if let Err(error) = job.enqueue(&mut conn).await { - warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job"); + let git_index_job = jobs::SyncToGitIndex::new(crate_name); + let sparse_index_job = jobs::SyncToSparseIndex::new(crate_name); + + if let Err(error) = tokio::try_join!( + git_index_job.enqueue(&mut conn), + sparse_index_job.enqueue(&mut conn), + ) { + warn!(%crate_name, "Failed to enqueue background job: {error}"); } for version in &opts.versions { diff --git a/src/bin/crates-admin/yank_version.rs b/src/bin/crates-admin/yank_version.rs index 79fea8b9b5..8dfc2fe9cc 100644 --- a/src/bin/crates-admin/yank_version.rs +++ b/src/bin/crates-admin/yank_version.rs @@ -67,11 +67,15 @@ async fn yank(opts: Opts, conn: &mut AsyncPgConnection) -> anyhow::Result<()> { .execute(conn) .await?; - SyncToGitIndex::new(&krate.name).enqueue(conn).await?; + let git_index_job = SyncToGitIndex::new(&krate.name); + let sparse_index_job = SyncToSparseIndex::new(&krate.name); + let update_default_version_job = UpdateDefaultVersion::new(krate.id); - SyncToSparseIndex::new(&krate.name).enqueue(conn).await?; - - UpdateDefaultVersion::new(krate.id).enqueue(conn).await?; + tokio::try_join!( + git_index_job.enqueue(conn), + sparse_index_job.enqueue(conn), + update_default_version_job.enqueue(conn), + )?; Ok(()) } diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 75e4a1a35f..1153581267 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -10,11 +10,12 @@ use axum::Json; use cargo_manifest::{Dependency, DepsSet, TargetDepsSet}; use chrono::{DateTime, SecondsFormat, Utc}; use crates_io_tarball::{process_tarball, TarballError}; -use crates_io_worker::BackgroundJob; +use crates_io_worker::{BackgroundJob, EnqueueError}; use diesel::dsl::{exists, select}; use diesel::prelude::*; use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; +use futures_util::TryFutureExt; use futures_util::TryStreamExt; use hex::ToHex; use http::StatusCode; @@ -503,29 +504,41 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult(None) + }), + updates_feed_job.enqueue(conn).or_else(|error| async move { + error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}"); + Ok::<_, EnqueueError>(None) + }), + )?; // Experiment: check new crates for potential typosquatting. if existing_crate.is_none() { - CheckTyposquat::new(&krate.name).enqueue(conn).await?; - } - - let job = jobs::rss::SyncCrateFeed::new(krate.name.clone()); - if let Err(error) = job.enqueue(conn).await { - error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}"); - } - - if let Err(error) = jobs::rss::SyncUpdatesFeed.enqueue(conn).await { - error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}"); - } - - if existing_crate.is_none() { - if let Err(error) = jobs::rss::SyncCratesFeed.enqueue(conn).await { - error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}"); - } + let crates_feed_job = jobs::rss::SyncCratesFeed; + let typosquat_job = CheckTyposquat::new(&krate.name); + + tokio::try_join!( + crates_feed_job.enqueue(conn).or_else(|error| async move { + error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}"); + Ok::<_, EnqueueError>(None) + }), + typosquat_job.enqueue(conn).or_else(|error| async move { + error!("Failed to enqueue `CheckTyposquat` job: {error}"); + Ok::<_, EnqueueError>(None) + }), + )?; } // The `other` field on `PublishWarnings` was introduced to handle a temporary warning diff --git a/src/controllers/version/metadata.rs b/src/controllers/version/metadata.rs index b8104f09e2..61130b80bf 100644 --- a/src/controllers/version/metadata.rs +++ b/src/controllers/version/metadata.rs @@ -239,9 +239,15 @@ pub async fn perform_version_yank_update( .insert(conn) .await?; - SyncToGitIndex::new(&krate.name).enqueue(conn).await?; - SyncToSparseIndex::new(&krate.name).enqueue(conn).await?; - UpdateDefaultVersion::new(krate.id).enqueue(conn).await?; + let git_index_job = SyncToGitIndex::new(&krate.name); + let sparse_index_job = SyncToSparseIndex::new(&krate.name); + let update_default_version_job = UpdateDefaultVersion::new(krate.id); + + tokio::try_join!( + git_index_job.enqueue(conn), + sparse_index_job.enqueue(conn), + update_default_version_job.enqueue(conn), + )?; Ok(()) }