diff --git a/src/worker/jobs/archive_version_downloads.rs b/src/worker/jobs/archive_version_downloads.rs index bfcb64a3a6b..95e6a3de937 100644 --- a/src/worker/jobs/archive_version_downloads.rs +++ b/src/worker/jobs/archive_version_downloads.rs @@ -1,3 +1,4 @@ +use super::IndexVersionDownloadsArchive; use crate::schema::version_downloads; use crate::tasks::spawn_blocking; use crate::worker::Environment; @@ -5,7 +6,6 @@ use anyhow::{anyhow, Context}; use chrono::{NaiveDate, Utc}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::{AsyncPgConnection, RunQueryDsl}; use futures_util::StreamExt; use object_store::ObjectStore; @@ -63,10 +63,15 @@ impl BackgroundJob for ArchiveVersionDownloads { export(&env.config.db.primary.url, &csv_path, &self.before).await?; let dates = spawn_blocking(move || split(csv_path)).await?; let uploaded_dates = upload(downloads_archive_store, tempdir.path(), dates).await?; - delete(&env.deadpool, uploaded_dates).await?; + + let mut conn = env.deadpool.get().await?; + delete(&mut conn, uploaded_dates).await?; // Queue up the job to regenerate the archive index. - enqueue_index_job(&env.deadpool).await?; + IndexVersionDownloadsArchive + .async_enqueue(&mut conn) + .await + .context("Failed to enqueue IndexVersionDownloadsArchive job")?; info!("Finished archiving old version downloads"); Ok(()) @@ -218,12 +223,7 @@ async fn upload_file(store: &impl ObjectStore, path: impl AsRef) -> anyhow } /// Delete version downloads for the given dates from the database. -async fn delete(db_pool: &Pool, dates: Vec) -> anyhow::Result<()> { - let mut conn = db_pool.get().await?; - delete_inner(&mut conn, dates).await -} - -async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec) -> anyhow::Result<()> { +async fn delete(conn: &mut AsyncPgConnection, dates: Vec) -> anyhow::Result<()> { // Delete version downloads for the given dates in chunks to avoid running // into the maximum query parameter limit. const CHUNK_SIZE: usize = 5000; @@ -244,23 +244,11 @@ async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec) -> an Ok(()) } -async fn enqueue_index_job(db_pool: &Pool) -> anyhow::Result<()> { - let mut conn = db_pool.get().await?; - - super::IndexVersionDownloadsArchive - .async_enqueue(&mut conn) - .await - .context("Failed to enqueue IndexVersionDownloadsArchive job")?; - - Ok(()) -} - #[cfg(test)] mod tests { use super::*; use crate::schema::{crates, version_downloads, versions}; use crates_io_test_db::TestDatabase; - use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncConnection; use insta::assert_snapshot; @@ -372,10 +360,8 @@ mod tests { let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap(); prepare_database(&mut conn).await; - let manager = AsyncDieselConnectionManager::::new(test_db.url()); - let db_pool = Pool::builder(manager).build().unwrap(); let dates = vec![NaiveDate::from_ymd_opt(2021, 1, 1).unwrap()]; - delete(&db_pool, dates).await.unwrap(); + delete(&mut conn, dates).await.unwrap(); let row_count: i64 = version_downloads::table .count()