diff --git a/src/admin/delete_crate.rs b/src/admin/delete_crate.rs index db86e908901..9d6c6b1ba05 100644 --- a/src/admin/delete_crate.rs +++ b/src/admin/delete_crate.rs @@ -97,8 +97,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { }; info!("{name}: Enqueuing index sync jobs…"); - if let Err(error) = jobs::enqueue_sync_to_index(name, conn) { - warn!("{name}: Failed to enqueue index sync jobs: {error}"); + if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) { + warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}"); + } + if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) { + warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}"); } info!("{name}: Enqueuing DeleteCrateFromStorage job…"); diff --git a/src/admin/delete_version.rs b/src/admin/delete_version.rs index 16665d4f930..e2c23cfe25f 100644 --- a/src/admin/delete_version.rs +++ b/src/admin/delete_version.rs @@ -5,6 +5,7 @@ use crate::tasks::spawn_blocking; use crate::worker::jobs; use crate::{admin::dialoguer, db, schema::versions}; use anyhow::Context; +use crates_io_worker::BackgroundJob; use diesel::{Connection, ExpressionMethods, QueryDsl}; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; @@ -102,8 +103,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { })?; info!(%crate_name, "Enqueuing index sync jobs"); - if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) { - warn!(%crate_name, ?error, "Failed to enqueue index sync jobs"); + if let Err(error) = jobs::SyncToGitIndex::new(crate_name).enqueue(conn) { + warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job"); + } + if let Err(error) = jobs::SyncToSparseIndex::new(crate_name).enqueue(conn) { + warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job"); } Ok(opts) diff --git a/src/admin/yank_version.rs b/src/admin/yank_version.rs index 0d7b66075ad..1f4efb0e99c 100644 --- a/src/admin/yank_version.rs +++ b/src/admin/yank_version.rs @@ -3,8 +3,7 @@ use crate::db; use crate::models::{Crate, Version}; use crate::schema::versions; use crate::tasks::spawn_blocking; -use crate::worker::jobs; -use crate::worker::jobs::UpdateDefaultVersion; +use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; @@ -63,8 +62,8 @@ fn yank(opts: Opts, conn: &mut PgConnection) -> anyhow::Result<()> { .set(versions::yanked.eq(true)) .execute(conn)?; - jobs::enqueue_sync_to_index(&krate.name, conn)?; - + SyncToGitIndex::new(&krate.name).enqueue(conn)?; + SyncToSparseIndex::new(&krate.name).enqueue(conn)?; UpdateDefaultVersion::new(krate.id).enqueue(conn)?; Ok(()) diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 8715b952353..0c1f5da9f62 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -442,7 +442,8 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult( - krate: T, - conn: &mut impl Conn, -) -> Result<(), EnqueueError> { - // Returns jobs with matching `job_type`, `data` and `priority`, - // skipping ones that are already locked by the background worker. - let find_similar_jobs_query = - |job_type: &'static str, data: serde_json::Value, priority: i16| { - background_jobs::table - .select(background_jobs::id) - .filter(background_jobs::job_type.eq(job_type)) - .filter(background_jobs::data.eq(data)) - .filter(background_jobs::priority.eq(priority)) - .for_update() - .skip_locked() - }; - - // Returns one `job_type, data, priority` row with values from the - // passed-in `job`, unless a similar row already exists. - let deduplicated_select_query = - |job_type: &'static str, data: serde_json::Value, priority: i16| { - diesel::select(( - job_type.into_sql::(), - data.clone().into_sql::(), - priority.into_sql::(), - )) - .filter(not(exists(find_similar_jobs_query( - job_type, data, priority, - )))) - }; - - let to_git = deduplicated_select_query( - SyncToGitIndex::JOB_NAME, - serde_json::to_value(SyncToGitIndex::new(krate.to_string()))?, - SyncToGitIndex::PRIORITY, - ); - - let to_sparse = deduplicated_select_query( - SyncToSparseIndex::JOB_NAME, - serde_json::to_value(SyncToSparseIndex::new(krate.to_string()))?, - SyncToSparseIndex::PRIORITY, - ); - - // Insert index update background jobs, but only if they do not - // already exist. - let added_jobs_count = diesel::insert_into(background_jobs::table) - .values(to_git.union_all(to_sparse)) - .into_columns(( - background_jobs::job_type, - background_jobs::data, - background_jobs::priority, - )) - .execute(conn)?; - - // Print a log event if we skipped inserting a job due to deduplication. - if added_jobs_count != 2 { - let skipped_jobs_count = 2 - added_jobs_count; - info!(%skipped_jobs_count, "Skipped adding duplicate jobs to the background worker queue"); - } - - Ok(()) -}