From c864a7586f2061089631ac74fb1f57a8cab15be9 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 22 Oct 2024 23:42:04 +0200 Subject: [PATCH 1/2] worker/jobs/archive_version_downloads: Simplify function arguments There is no need for us to pass in the full database pool, when all we need is a single connection. --- src/worker/jobs/archive_version_downloads.rs | 25 +++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/src/worker/jobs/archive_version_downloads.rs b/src/worker/jobs/archive_version_downloads.rs index bfcb64a3a6b..5b3536a9b51 100644 --- a/src/worker/jobs/archive_version_downloads.rs +++ b/src/worker/jobs/archive_version_downloads.rs @@ -5,7 +5,6 @@ use anyhow::{anyhow, Context}; use chrono::{NaiveDate, Utc}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::{AsyncPgConnection, RunQueryDsl}; use futures_util::StreamExt; use object_store::ObjectStore; @@ -63,10 +62,12 @@ impl BackgroundJob for ArchiveVersionDownloads { export(&env.config.db.primary.url, &csv_path, &self.before).await?; let dates = spawn_blocking(move || split(csv_path)).await?; let uploaded_dates = upload(downloads_archive_store, tempdir.path(), dates).await?; - delete(&env.deadpool, uploaded_dates).await?; + + let mut conn = env.deadpool.get().await?; + delete(&mut conn, uploaded_dates).await?; // Queue up the job to regenerate the archive index. - enqueue_index_job(&env.deadpool).await?; + enqueue_index_job(&mut conn).await?; info!("Finished archiving old version downloads"); Ok(()) @@ -218,12 +219,7 @@ async fn upload_file(store: &impl ObjectStore, path: impl AsRef) -> anyhow } /// Delete version downloads for the given dates from the database. -async fn delete(db_pool: &Pool, dates: Vec) -> anyhow::Result<()> { - let mut conn = db_pool.get().await?; - delete_inner(&mut conn, dates).await -} - -async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec) -> anyhow::Result<()> { +async fn delete(conn: &mut AsyncPgConnection, dates: Vec) -> anyhow::Result<()> { // Delete version downloads for the given dates in chunks to avoid running // into the maximum query parameter limit. const CHUNK_SIZE: usize = 5000; @@ -244,11 +240,9 @@ async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec) -> an Ok(()) } -async fn enqueue_index_job(db_pool: &Pool) -> anyhow::Result<()> { - let mut conn = db_pool.get().await?; - +async fn enqueue_index_job(conn: &mut AsyncPgConnection) -> anyhow::Result<()> { super::IndexVersionDownloadsArchive - .async_enqueue(&mut conn) + .async_enqueue(conn) .await .context("Failed to enqueue IndexVersionDownloadsArchive job")?; @@ -260,7 +254,6 @@ mod tests { use super::*; use crate::schema::{crates, version_downloads, versions}; use crates_io_test_db::TestDatabase; - use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncConnection; use insta::assert_snapshot; @@ -372,10 +365,8 @@ mod tests { let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap(); prepare_database(&mut conn).await; - let manager = AsyncDieselConnectionManager::::new(test_db.url()); - let db_pool = Pool::builder(manager).build().unwrap(); let dates = vec![NaiveDate::from_ymd_opt(2021, 1, 1).unwrap()]; - delete(&db_pool, dates).await.unwrap(); + delete(&mut conn, dates).await.unwrap(); let row_count: i64 = version_downloads::table .count() From 7e899a9820d69192fb1945f41f82edf9d55c30e5 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 22 Oct 2024 23:44:05 +0200 Subject: [PATCH 2/2] worker/jobs/archive_version_downloads: Inline `enqueue_index_job()` fn --- src/worker/jobs/archive_version_downloads.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/worker/jobs/archive_version_downloads.rs b/src/worker/jobs/archive_version_downloads.rs index 5b3536a9b51..95e6a3de937 100644 --- a/src/worker/jobs/archive_version_downloads.rs +++ b/src/worker/jobs/archive_version_downloads.rs @@ -1,3 +1,4 @@ +use super::IndexVersionDownloadsArchive; use crate::schema::version_downloads; use crate::tasks::spawn_blocking; use crate::worker::Environment; @@ -67,7 +68,10 @@ impl BackgroundJob for ArchiveVersionDownloads { delete(&mut conn, uploaded_dates).await?; // Queue up the job to regenerate the archive index. - enqueue_index_job(&mut conn).await?; + IndexVersionDownloadsArchive + .async_enqueue(&mut conn) + .await + .context("Failed to enqueue IndexVersionDownloadsArchive job")?; info!("Finished archiving old version downloads"); Ok(()) @@ -240,15 +244,6 @@ async fn delete(conn: &mut AsyncPgConnection, dates: Vec) -> anyhow:: Ok(()) } -async fn enqueue_index_job(conn: &mut AsyncPgConnection) -> anyhow::Result<()> { - super::IndexVersionDownloadsArchive - .async_enqueue(conn) - .await - .context("Failed to enqueue IndexVersionDownloadsArchive job")?; - - Ok(()) -} - #[cfg(test)] mod tests { use super::*;