diff --git a/src/worker/jobs/rss/sync_crate_feed.rs b/src/worker/jobs/rss/sync_crate_feed.rs index 931d677a50..7f8ad256aa 100644 --- a/src/worker/jobs/rss/sync_crate_feed.rs +++ b/src/worker/jobs/rss/sync_crate_feed.rs @@ -1,12 +1,10 @@ use crate::schema::{crates, versions}; use crate::storage::FeedId; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::worker::Environment; use chrono::Duration; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use std::sync::Arc; /// Items younger than this will always be included in the feed. @@ -42,16 +40,9 @@ impl BackgroundJob for SyncCrateFeed { let domain = &ctx.config.domain_name; info!("Loading latest {NUM_ITEMS} version updates for `{name}` from the database…"); - let conn = ctx.deadpool.get().await?; - - let version_updates = spawn_blocking({ - let name = name.clone(); - move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - Ok::<_, anyhow::Error>(load_version_updates(&name, conn)?) - } - }) - .await?; + let mut conn = ctx.deadpool.get().await?; + + let version_updates = load_version_updates(name, &mut conn).await?; let feed_id = FeedId::Crate { name }; @@ -102,7 +93,10 @@ impl BackgroundJob for SyncCrateFeed { /// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] versions /// then the list will be padded with older versions until [`NUM_ITEMS`] are /// returned. -fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult> { +async fn load_version_updates( + name: &str, + conn: &mut AsyncPgConnection, +) -> QueryResult> { let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE; let updates = versions::table @@ -111,7 +105,8 @@ fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult= NUM_ITEMS { @@ -125,6 +120,7 @@ fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult>()); @@ -213,10 +210,10 @@ mod tests { for i in 1..=NUM_ITEMS { let version = format!("1.2.{i}"); let publish_time = now - Duration::days(90) + Duration::hours(i); - create_version(&mut conn, foo, &version, publish_time); + create_version(&mut conn, foo, &version, publish_time).await; } - let updates = assert_ok!(load_version_updates("foo", &mut conn)); + let updates = assert_ok!(load_version_updates("foo", &mut conn).await); assert_eq!(updates.len() as i64, NUM_ITEMS); assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::>()); @@ -224,24 +221,25 @@ mod tests { for i in 1..=(NUM_ITEMS + 10) { let version = format!("1.3.{i}"); let publish_time = now - Duration::minutes(30) + Duration::seconds(i); - create_version(&mut conn, foo, &version, publish_time); + create_version(&mut conn, foo, &version, publish_time).await; } - let updates = assert_ok!(load_version_updates("foo", &mut conn)); + let updates = assert_ok!(load_version_updates("foo", &mut conn).await); assert_eq!(updates.len() as i64, NUM_ITEMS + 10); assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::>()); } - fn create_crate(conn: &mut impl Conn, name: &str) -> i32 { + async fn create_crate(conn: &mut AsyncPgConnection, name: &str) -> i32 { diesel::insert_into(crates::table) .values((crates::name.eq(name),)) .returning(crates::id) .get_result(conn) + .await .unwrap() } - fn create_version( - conn: &mut impl Conn, + async fn create_version( + conn: &mut AsyncPgConnection, crate_id: i32, version: &str, publish_time: NaiveDateTime, @@ -256,6 +254,7 @@ mod tests { )) .returning(versions::id) .get_result(conn) + .await .unwrap() } } diff --git a/src/worker/jobs/rss/sync_crates_feed.rs b/src/worker/jobs/rss/sync_crates_feed.rs index deb9071cb0..57de1545fa 100644 --- a/src/worker/jobs/rss/sync_crates_feed.rs +++ b/src/worker/jobs/rss/sync_crates_feed.rs @@ -1,12 +1,10 @@ use crate::schema::crates; use crate::storage::FeedId; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::worker::Environment; use chrono::Duration; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use std::sync::Arc; #[derive(Serialize, Deserialize)] @@ -34,12 +32,8 @@ impl BackgroundJob for SyncCratesFeed { let domain = &ctx.config.domain_name; info!("Loading latest {NUM_ITEMS} crates from the database…"); - let conn = ctx.deadpool.get().await?; - let new_crates = spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - Ok::<_, anyhow::Error>(load_new_crates(conn)?) - }) - .await?; + let mut conn = ctx.deadpool.get().await?; + let new_crates = load_new_crates(&mut conn).await?; let link = rss::extension::atom::Link { href: ctx.storage.feed_url(&feed_id), @@ -86,14 +80,15 @@ impl BackgroundJob for SyncCratesFeed { /// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] crates /// then the list will be padded with older crates until [`NUM_ITEMS`] are /// returned. -fn load_new_crates(conn: &mut impl Conn) -> QueryResult> { +async fn load_new_crates(conn: &mut AsyncPgConnection) -> QueryResult> { let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE; let new_crates = crates::table .filter(crates::created_at.gt(threshold_dt)) .order(crates::created_at.desc()) .select(NewCrate::as_select()) - .load(conn)?; + .load(conn) + .await?; let num_new_crates = new_crates.len(); if num_new_crates as i64 >= NUM_ITEMS { @@ -105,6 +100,7 @@ fn load_new_crates(conn: &mut impl Conn) -> QueryResult> { .select(NewCrate::as_select()) .limit(NUM_ITEMS) .load(conn) + .await } #[derive(Debug, Queryable, Selectable)] @@ -161,27 +157,28 @@ mod tests { use super::*; use chrono::NaiveDateTime; use crates_io_test_db::TestDatabase; + use diesel_async::{AsyncConnection, AsyncPgConnection}; use insta::assert_debug_snapshot; - #[test] - fn test_load_version_updates() { + #[tokio::test] + async fn test_load_version_updates() { crate::util::tracing::init_for_test(); let db = TestDatabase::new(); - let mut conn = db.connect(); + let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap(); let now = chrono::Utc::now().naive_utc(); - let new_crates = assert_ok!(load_new_crates(&mut conn)); + let new_crates = assert_ok!(load_new_crates(&mut conn).await); assert_eq!(new_crates.len(), 0); // If there are less than NUM_ITEMS crates, they should all be returned - create_crate(&mut conn, "foo", now - Duration::days(123)); - create_crate(&mut conn, "bar", now - Duration::days(110)); - create_crate(&mut conn, "baz", now - Duration::days(100)); - create_crate(&mut conn, "qux", now - Duration::days(90)); + create_crate(&mut conn, "foo", now - Duration::days(123)).await; + create_crate(&mut conn, "bar", now - Duration::days(110)).await; + create_crate(&mut conn, "baz", now - Duration::days(100)).await; + create_crate(&mut conn, "qux", now - Duration::days(90)).await; - let new_crates = assert_ok!(load_new_crates(&mut conn)); + let new_crates = assert_ok!(load_new_crates(&mut conn).await); assert_eq!(new_crates.len(), 4); assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::>()); @@ -189,10 +186,10 @@ mod tests { for i in 1..=NUM_ITEMS { let name = format!("crate-{i}"); let publish_time = now - Duration::days(90) + Duration::hours(i); - create_crate(&mut conn, &name, publish_time); + create_crate(&mut conn, &name, publish_time).await; } - let new_crates = assert_ok!(load_new_crates(&mut conn)); + let new_crates = assert_ok!(load_new_crates(&mut conn).await); assert_eq!(new_crates.len() as i64, NUM_ITEMS); assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::>()); @@ -200,15 +197,19 @@ mod tests { for i in 1..=(NUM_ITEMS + 10) { let name = format!("other-crate-{i}"); let publish_time = now - Duration::minutes(30) + Duration::seconds(i); - create_crate(&mut conn, &name, publish_time); + create_crate(&mut conn, &name, publish_time).await; } - let new_crates = assert_ok!(load_new_crates(&mut conn)); + let new_crates = assert_ok!(load_new_crates(&mut conn).await); assert_eq!(new_crates.len() as i64, NUM_ITEMS + 10); assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::>()); } - fn create_crate(conn: &mut impl Conn, name: &str, publish_time: NaiveDateTime) -> i32 { + async fn create_crate( + conn: &mut AsyncPgConnection, + name: &str, + publish_time: NaiveDateTime, + ) -> i32 { diesel::insert_into(crates::table) .values(( crates::name.eq(name), @@ -217,6 +218,7 @@ mod tests { )) .returning(crates::id) .get_result(conn) + .await .unwrap() } } diff --git a/src/worker/jobs/rss/sync_updates_feed.rs b/src/worker/jobs/rss/sync_updates_feed.rs index 9d9cc71fce..fb0dd936ee 100644 --- a/src/worker/jobs/rss/sync_updates_feed.rs +++ b/src/worker/jobs/rss/sync_updates_feed.rs @@ -1,12 +1,10 @@ use crate::schema::{crates, versions}; use crate::storage::FeedId; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::worker::Environment; use chrono::Duration; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use std::sync::Arc; #[derive(Serialize, Deserialize)] @@ -34,12 +32,8 @@ impl BackgroundJob for SyncUpdatesFeed { let domain = &ctx.config.domain_name; info!("Loading latest {NUM_ITEMS} version updates from the database…"); - let conn = ctx.deadpool.get().await?; - let version_updates = spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - Ok::<_, anyhow::Error>(load_version_updates(conn)?) - }) - .await?; + let mut conn = ctx.deadpool.get().await?; + let version_updates = load_version_updates(&mut conn).await?; let link = rss::extension::atom::Link { href: ctx.storage.feed_url(&feed_id), @@ -86,7 +80,7 @@ impl BackgroundJob for SyncUpdatesFeed { /// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] versions /// then the list will be padded with older versions until [`NUM_ITEMS`] are /// returned. -fn load_version_updates(conn: &mut impl Conn) -> QueryResult> { +async fn load_version_updates(conn: &mut AsyncPgConnection) -> QueryResult> { let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE; let updates = versions::table @@ -94,7 +88,8 @@ fn load_version_updates(conn: &mut impl Conn) -> QueryResult> .filter(versions::created_at.gt(threshold_dt)) .order(versions::created_at.desc()) .select(VersionUpdate::as_select()) - .load(conn)?; + .load(conn) + .await?; let num_updates = updates.len(); if num_updates as i64 >= NUM_ITEMS { @@ -107,6 +102,7 @@ fn load_version_updates(conn: &mut impl Conn) -> QueryResult> .select(VersionUpdate::as_select()) .limit(NUM_ITEMS) .load(conn) + .await } #[derive(Debug, Queryable, Selectable)] @@ -177,29 +173,30 @@ mod tests { use super::*; use chrono::NaiveDateTime; use crates_io_test_db::TestDatabase; + use diesel_async::AsyncConnection; use insta::assert_debug_snapshot; - #[test] - fn test_load_version_updates() { + #[tokio::test] + async fn test_load_version_updates() { crate::util::tracing::init_for_test(); let db = TestDatabase::new(); - let mut conn = db.connect(); + let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap(); let now = chrono::Utc::now().naive_utc(); - let updates = assert_ok!(load_version_updates(&mut conn)); + let updates = assert_ok!(load_version_updates(&mut conn).await); assert_eq!(updates.len(), 0); - let foo = create_crate(&mut conn, "foo"); + let foo = create_crate(&mut conn, "foo").await; // If there are less than NUM_ITEMS versions, they should all be returned - create_version(&mut conn, foo, "1.0.0", now - Duration::days(123)); - create_version(&mut conn, foo, "1.0.1", now - Duration::days(110)); - create_version(&mut conn, foo, "1.1.0", now - Duration::days(100)); - create_version(&mut conn, foo, "1.2.0", now - Duration::days(90)); + create_version(&mut conn, foo, "1.0.0", now - Duration::days(123)).await; + create_version(&mut conn, foo, "1.0.1", now - Duration::days(110)).await; + create_version(&mut conn, foo, "1.1.0", now - Duration::days(100)).await; + create_version(&mut conn, foo, "1.2.0", now - Duration::days(90)).await; - let updates = assert_ok!(load_version_updates(&mut conn)); + let updates = assert_ok!(load_version_updates(&mut conn).await); assert_eq!(updates.len(), 4); assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::>()); @@ -207,10 +204,10 @@ mod tests { for i in 1..=NUM_ITEMS { let version = format!("1.2.{i}"); let publish_time = now - Duration::days(90) + Duration::hours(i); - create_version(&mut conn, foo, &version, publish_time); + create_version(&mut conn, foo, &version, publish_time).await; } - let updates = assert_ok!(load_version_updates(&mut conn)); + let updates = assert_ok!(load_version_updates(&mut conn).await); assert_eq!(updates.len() as i64, NUM_ITEMS); assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::>()); @@ -218,24 +215,25 @@ mod tests { for i in 1..=(NUM_ITEMS + 10) { let version = format!("1.3.{i}"); let publish_time = now - Duration::minutes(30) + Duration::seconds(i); - create_version(&mut conn, foo, &version, publish_time); + create_version(&mut conn, foo, &version, publish_time).await; } - let updates = assert_ok!(load_version_updates(&mut conn)); + let updates = assert_ok!(load_version_updates(&mut conn).await); assert_eq!(updates.len() as i64, NUM_ITEMS + 10); assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::>()); } - fn create_crate(conn: &mut impl Conn, name: &str) -> i32 { + async fn create_crate(conn: &mut AsyncPgConnection, name: &str) -> i32 { diesel::insert_into(crates::table) .values((crates::name.eq(name),)) .returning(crates::id) .get_result(conn) + .await .unwrap() } - fn create_version( - conn: &mut impl Conn, + async fn create_version( + conn: &mut AsyncPgConnection, crate_id: i32, version: &str, publish_time: NaiveDateTime, @@ -250,6 +248,7 @@ mod tests { )) .returning(versions::id) .get_result(conn) + .await .unwrap() } }