From 0cf8e1f496a2808c4f3f600a6528f7c4580f136f Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 29 Oct 2024 00:50:45 +0100 Subject: [PATCH] worker/jobs/downloads: Remove `spawn_blocking()` usage --- .../downloads/clean_processed_log_files.rs | 59 +++++---- src/worker/jobs/downloads/process_log.rs | 115 +++++++++--------- 2 files changed, 89 insertions(+), 85 deletions(-) diff --git a/src/worker/jobs/downloads/clean_processed_log_files.rs b/src/worker/jobs/downloads/clean_processed_log_files.rs index 5b744401926..b32e7b83a75 100644 --- a/src/worker/jobs/downloads/clean_processed_log_files.rs +++ b/src/worker/jobs/downloads/clean_processed_log_files.rs @@ -1,10 +1,8 @@ use crate::schema::processed_log_files; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::worker::Environment; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use std::sync::Arc; /// This job is responsible for cleaning up old entries in the @@ -22,18 +20,17 @@ impl BackgroundJob for CleanProcessedLogFiles { type Context = Arc; async fn run(&self, env: Self::Context) -> anyhow::Result<()> { - let conn = env.deadpool.get().await?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - Ok(run(conn)?) - }) - .await + let mut conn = env.deadpool.get().await?; + Ok(run(&mut conn).await?) } } -fn run(conn: &mut impl Conn) -> QueryResult<()> { +async fn run(conn: &mut AsyncPgConnection) -> QueryResult<()> { let filter = processed_log_files::time.lt(cut_off_date()); - diesel::delete(processed_log_files::table.filter(filter)).execute(conn)?; + + diesel::delete(processed_log_files::table.filter(filter)) + .execute(conn) + .await?; Ok(()) } @@ -45,29 +42,29 @@ fn cut_off_date() -> chrono::DateTime { #[cfg(test)] mod tests { use super::*; - use crate::test_util::test_db_connection; use chrono::{DateTime, Utc}; + use crates_io_test_db::TestDatabase; + use diesel_async::{AsyncConnection, AsyncPgConnection}; use insta::assert_debug_snapshot; - #[test] - fn test_cleanup() { - let (_test_db, conn) = &mut test_db_connection(); + #[tokio::test] + async fn test_cleanup() { + let test_db = TestDatabase::new(); + let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap(); let now = chrono::Utc::now(); let cut_off_date = cut_off_date(); let one_hour = chrono::Duration::try_hours(1).unwrap(); - insert( - conn, - vec![ - ("very-old-file", cut_off_date - one_hour * 30 * 24), - ("old-file", cut_off_date - one_hour), - ("newish-file", cut_off_date + one_hour), - ("brand-new-file", now), - ("future-file", now + one_hour * 7 * 24), - ], - ); - assert_debug_snapshot!(paths_in_table(conn), @r###" + let inserts = vec![ + ("very-old-file", cut_off_date - one_hour * 30 * 24), + ("old-file", cut_off_date - one_hour), + ("newish-file", cut_off_date + one_hour), + ("brand-new-file", now), + ("future-file", now + one_hour * 7 * 24), + ]; + insert(&mut conn, inserts).await; + assert_debug_snapshot!(paths_in_table(&mut conn).await, @r###" [ "very-old-file", "old-file", @@ -77,8 +74,8 @@ mod tests { ] "###); - run(conn).unwrap(); - assert_debug_snapshot!(paths_in_table(conn), @r###" + run(&mut conn).await.unwrap(); + assert_debug_snapshot!(paths_in_table(&mut conn).await, @r###" [ "newish-file", "brand-new-file", @@ -88,7 +85,7 @@ mod tests { } /// Insert a list of paths and times into the `processed_log_files` table. - fn insert(conn: &mut PgConnection, inserts: Vec<(&str, DateTime)>) { + async fn insert(conn: &mut AsyncPgConnection, inserts: Vec<(&str, DateTime)>) { let inserts = inserts .into_iter() .map(|(path, time)| { @@ -102,14 +99,16 @@ mod tests { diesel::insert_into(processed_log_files::table) .values(&inserts) .execute(conn) + .await .unwrap(); } /// Read all paths from the `processed_log_files` table. - fn paths_in_table(conn: &mut PgConnection) -> Vec { + async fn paths_in_table(conn: &mut AsyncPgConnection) -> Vec { processed_log_files::table .select(processed_log_files::path) .load::(conn) + .await .unwrap() } } diff --git a/src/worker/jobs/downloads/process_log.rs b/src/worker/jobs/downloads/process_log.rs index 73106c1ba47..48e446adf94 100644 --- a/src/worker/jobs/downloads/process_log.rs +++ b/src/worker/jobs/downloads/process_log.rs @@ -1,6 +1,4 @@ use crate::config::CdnLogStorageConfig; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::worker::Environment; use anyhow::Context; use chrono::NaiveDate; @@ -9,9 +7,9 @@ use crates_io_worker::BackgroundJob; use diesel::dsl::exists; use diesel::prelude::*; use diesel::{select, QueryResult}; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::pooled_connection::deadpool::Pool; -use diesel_async::AsyncPgConnection; +use diesel_async::scoped_futures::ScopedFutureExt; +use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; @@ -123,11 +121,9 @@ async fn run( log_stats(&downloads); let path = path.to_string(); - let conn = db_pool.get().await?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - - conn.transaction(|conn| { + let mut conn = db_pool.get().await?; + conn.transaction(|conn| { + async move { // Mark the log file as processed before saving the downloads to // the database. // @@ -138,14 +134,15 @@ async fn run( // When the job is retried the `already_processed()` call above // will return `true` and the job will skip processing the log // file again. - save_as_processed(path, conn)?; - - save_downloads(downloads, conn) - })?; + save_as_processed(path, conn).await?; - Ok::<_, anyhow::Error>(()) + save_downloads(downloads, conn).await + } + .scope_boxed() }) - .await + .await?; + + Ok(()) } /// Loads the given log file from the object store and counts the number of @@ -219,15 +216,23 @@ impl From<(String, Version, NaiveDate, u64)> for NewDownload { /// The temporary table only exists on the current connection, but if a /// connection pool is used, the temporary table will not be dropped when /// the connection is returned to the pool. -pub fn save_downloads(downloads: DownloadsMap, conn: &mut impl Conn) -> anyhow::Result<()> { +pub async fn save_downloads( + downloads: DownloadsMap, + conn: &mut AsyncPgConnection, +) -> anyhow::Result<()> { debug!("Creating temp_downloads table"); - create_temp_downloads_table(conn).context("Failed to create temp_downloads table")?; + create_temp_downloads_table(conn) + .await + .context("Failed to create temp_downloads table")?; debug!("Saving counted downloads to temp_downloads table"); - fill_temp_downloads_table(downloads, conn).context("Failed to fill temp_downloads table")?; + fill_temp_downloads_table(downloads, conn) + .await + .context("Failed to fill temp_downloads table")?; debug!("Saving temp_downloads to version_downloads table"); let failed_inserts = save_to_version_downloads(conn) + .await .context("Failed to save temp_downloads to version_downloads table")?; if !failed_inserts.is_empty() { @@ -247,7 +252,7 @@ pub fn save_downloads(downloads: DownloadsMap, conn: &mut impl Conn) -> anyhow:: /// look up the `version_id` for each crate and version combination, and that /// requires a join with the `crates` and `versions` tables. #[instrument("db.query", skip_all, fields(message = "CREATE TEMPORARY TABLE ..."))] -fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult { +async fn create_temp_downloads_table(conn: &mut AsyncPgConnection) -> QueryResult { diesel::sql_query( r#" CREATE TEMPORARY TABLE temp_downloads ( @@ -259,6 +264,7 @@ fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult { "#, ) .execute(conn) + .await } /// Fills the temporary `temp_downloads` table with the downloads from the @@ -268,7 +274,10 @@ fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult { skip_all, fields(message = "INSERT INTO temp_downloads ...") )] -fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> QueryResult<()> { +async fn fill_temp_downloads_table( + downloads: DownloadsMap, + conn: &mut AsyncPgConnection, +) -> QueryResult<()> { // `tokio-postgres` has a limit on the size of values it can send to the // database. To avoid hitting this limit, we insert the downloads in // batches. @@ -283,7 +292,8 @@ fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> Q for chunk in map.chunks(MAX_BATCH_SIZE) { diesel::insert_into(temp_downloads::table) .values(chunk) - .execute(conn)?; + .execute(conn) + .await?; } Ok(()) @@ -297,7 +307,9 @@ fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> Q skip_all, fields(message = "INSERT INTO version_downloads ...") )] -fn save_to_version_downloads(conn: &mut impl Conn) -> QueryResult> { +async fn save_to_version_downloads( + conn: &mut AsyncPgConnection, +) -> QueryResult> { diesel::sql_query( r#" WITH joined_data AS ( @@ -319,7 +331,7 @@ fn save_to_version_downloads(conn: &mut impl Conn) -> QueryResult anyhow::Result { let path = path.into(); - let conn = db_pool.get().await?; - let already_processed = spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - Ok::<_, anyhow::Error>(already_processed_inner(path, conn)?) - }) - .await?; + let mut conn = db_pool.get().await?; + let already_processed = already_processed_inner(path, &mut conn).await?; Ok(already_processed) } @@ -374,21 +382,28 @@ async fn already_processed( /// Note that if a second job is already processing the same log file, this /// function will return `false` because the second job will not have inserted /// the path into the `processed_log_files` table yet. -fn already_processed_inner(path: impl Into, conn: &mut impl Conn) -> QueryResult { +async fn already_processed_inner( + path: impl Into, + conn: &mut AsyncPgConnection, +) -> QueryResult { use crate::schema::processed_log_files; let query = processed_log_files::table.filter(processed_log_files::path.eq(path.into())); - select(exists(query)).get_result(conn) + select(exists(query)).get_result(conn).await } /// Inserts the given path into the `processed_log_files` table to mark it as /// processed. -fn save_as_processed(path: impl Into, conn: &mut impl Conn) -> QueryResult<()> { +async fn save_as_processed( + path: impl Into, + conn: &mut AsyncPgConnection, +) -> QueryResult<()> { use crate::schema::processed_log_files; diesel::insert_into(processed_log_files::table) .values(processed_log_files::path.eq(path.into())) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) } @@ -397,7 +412,6 @@ fn save_as_processed(path: impl Into, conn: &mut impl Conn) -> QueryResu mod tests { use super::*; use crate::schema::{crates, version_downloads, versions}; - use crate::util::diesel::Conn; use crates_io_test_db::TestDatabase; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use insta::assert_debug_snapshot; @@ -485,26 +499,20 @@ mod tests { /// Inserts some dummy crates and versions into the database. async fn create_dummy_crates_and_versions(db_pool: Pool) { - let conn = db_pool.get().await.unwrap(); - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - - create_crate_and_version("bindgen", "0.65.1", conn); - create_crate_and_version("tracing-core", "0.1.32", conn); - create_crate_and_version("quick-error", "1.2.3", conn); + let mut conn = db_pool.get().await.unwrap(); - Ok::<_, anyhow::Error>(()) - }) - .await - .unwrap(); + create_crate_and_version("bindgen", "0.65.1", &mut conn).await; + create_crate_and_version("tracing-core", "0.1.32", &mut conn).await; + create_crate_and_version("quick-error", "1.2.3", &mut conn).await; } /// Inserts a dummy crate and version into the database. - fn create_crate_and_version(name: &str, version: &str, conn: &mut impl Conn) { + async fn create_crate_and_version(name: &str, version: &str, conn: &mut AsyncPgConnection) { let crate_id: i32 = diesel::insert_into(crates::table) .values(crates::name.eq(name)) .returning(crates::id) .get_result(conn) + .await .unwrap(); diesel::insert_into(versions::table) @@ -515,19 +523,15 @@ mod tests { versions::checksum.eq("checksum"), )) .execute(conn) + .await .unwrap(); } /// Queries all version downloads from the database and returns them as a /// [`Vec`] of strings for use with [`assert_debug_snapshot!()`]. async fn all_version_downloads(db_pool: Pool) -> Vec { - let conn = db_pool.get().await.unwrap(); - let downloads = spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - Ok::<_, anyhow::Error>(query_all_version_downloads(conn)) - }) - .await - .unwrap(); + let mut conn = db_pool.get().await.unwrap(); + let downloads = query_all_version_downloads(&mut conn).await; downloads .into_iter() @@ -539,8 +543,8 @@ mod tests { /// Queries all version downloads from the database and returns them as a /// [`Vec`] of tuples. - fn query_all_version_downloads( - conn: &mut impl Conn, + async fn query_all_version_downloads( + conn: &mut AsyncPgConnection, ) -> Vec<(String, String, i32, i32, NaiveDate, bool)> { version_downloads::table .inner_join(versions::table) @@ -555,6 +559,7 @@ mod tests { )) .order((crates::name, versions::num, version_downloads::date)) .load(conn) + .await .unwrap() } }