From 0e5ca572d0a1963a3b3513d18007dfe3e46450ea Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 8 Nov 2024 13:58:57 +0100 Subject: [PATCH 1/2] models/user: Add `async_email()` fn This is a clone of `email()`, but using `diesel_async` --- src/models/user.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/models/user.rs b/src/models/user.rs index 88a7307debe..05cd1dc29ee 100644 --- a/src/models/user.rs +++ b/src/models/user.rs @@ -113,6 +113,17 @@ impl User { .first(conn) .optional() } + + /// Queries for the email belonging to a particular user + pub async fn async_email(&self, conn: &mut AsyncPgConnection) -> QueryResult> { + use diesel_async::RunQueryDsl; + + Email::belonging_to(self) + .select(emails::email) + .first(conn) + .await + .optional() + } } /// Represents a new user record insertable to the `users` table From fb7ef890d705cbcd8e7f8f8d83d8266f0f2d07bb Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 8 Nov 2024 14:05:11 +0100 Subject: [PATCH 2/2] worker/jobs/expiry_notification: Migrate to `diesel-async` queries --- src/worker/jobs/expiry_notification.rs | 89 ++++++++++++-------------- 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/src/worker/jobs/expiry_notification.rs b/src/worker/jobs/expiry_notification.rs index bff89e39b1b..8f0828c3d4c 100644 --- a/src/worker/jobs/expiry_notification.rs +++ b/src/worker/jobs/expiry_notification.rs @@ -1,13 +1,11 @@ use crate::models::ApiToken; use crate::schema::api_tokens; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::{email::Email, models::User, worker::Environment, Emails}; use chrono::SecondsFormat; use crates_io_worker::BackgroundJob; use diesel::dsl::now; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use std::sync::Arc; /// The threshold for the expiry notification. @@ -27,24 +25,20 @@ impl BackgroundJob for SendTokenExpiryNotifications { #[instrument(skip(env), err)] async fn run(&self, env: Self::Context) -> anyhow::Result<()> { - let conn = env.deadpool.get().await?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); + let mut conn = env.deadpool.get().await?; - // Check if the token is about to expire - // If the token is about to expire, trigger a notification. - check(&env.emails, conn) - }) - .await + // Check if the token is about to expire + // If the token is about to expire, trigger a notification. + check(&env.emails, &mut conn).await } } /// Find tokens that are about to expire and send notifications to their owners. -fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> { +async fn check(emails: &Emails, conn: &mut AsyncPgConnection) -> anyhow::Result<()> { let before = chrono::Utc::now() + EXPIRY_THRESHOLD; info!("Searching for tokens that will expire before {before}…"); - let expired_tokens = find_expiring_tokens(conn, before)?; + let expired_tokens = find_expiring_tokens(conn, before).await?; let num_tokens = expired_tokens.len(); if num_tokens == 0 { info!("Found no tokens that will expire before {before}. Skipping expiry notifications."); @@ -59,7 +53,7 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> { let mut success = 0; for token in &expired_tokens { - if let Err(e) = handle_expiring_token(conn, token, emails) { + if let Err(e) = handle_expiring_token(conn, token, emails).await { error!(?e, "Failed to handle expiring token"); } else { success += 1; @@ -72,16 +66,16 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> { } /// Send an email to the user associated with the token. -fn handle_expiring_token( - conn: &mut impl Conn, +async fn handle_expiring_token( + conn: &mut AsyncPgConnection, token: &ApiToken, emails: &Emails, ) -> Result<(), anyhow::Error> { debug!("Looking up user {} for token {}…", token.user_id, token.id); - let user = User::find(conn, token.user_id)?; + let user = User::async_find(conn, token.user_id).await?; debug!("Looking up email address for user {}…", user.id); - let recipient = user.email(conn)?; + let recipient = user.async_email(conn).await?; if let Some(recipient) = recipient { debug!("Sending expiry notification to {}…", recipient); let email = ExpiryNotificationEmail { @@ -90,7 +84,7 @@ fn handle_expiring_token( token_name: &token.name, expiry_date: token.expired_at.unwrap().and_utc(), }; - emails.send(&recipient, email)?; + emails.async_send(&recipient, email).await?; } else { info!( "User {} has no email address set. Skipping expiry notification.", @@ -102,7 +96,8 @@ fn handle_expiring_token( debug!("Marking token {} as notified…", token.id); diesel::update(token) .set(api_tokens::expiry_notification_at.eq(now.nullable())) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) } @@ -112,8 +107,8 @@ fn handle_expiring_token( /// also ignored. /// /// This function returns at most `MAX_ROWS` tokens. -pub fn find_expiring_tokens( - conn: &mut impl Conn, +pub async fn find_expiring_tokens( + conn: &mut AsyncPgConnection, before: chrono::DateTime, ) -> QueryResult> { api_tokens::table @@ -131,6 +126,7 @@ pub fn find_expiring_tokens( .order_by(api_tokens::expired_at.asc()) // The most urgent tokens first .limit(MAX_ROWS) .get_results(conn) + .await } #[derive(Debug, Clone)] @@ -171,24 +167,27 @@ The crates.io team"#, mod tests { use super::*; use crate::models::NewUser; - use crate::{ - models::token::ApiToken, schema::api_tokens, test_util::test_db_connection, - util::token::PlainToken, - }; + use crate::tasks::spawn_blocking; + use crate::{models::token::ApiToken, schema::api_tokens, util::token::PlainToken}; + use crates_io_test_db::TestDatabase; use diesel::dsl::IntervalDsl; + use diesel_async::AsyncConnection; use lettre::Address; #[tokio::test] async fn test_expiry_notification() -> anyhow::Result<()> { let emails = Emails::new_in_memory(); - let (_test_db, mut conn) = test_db_connection(); + + let test_db = TestDatabase::new(); + let mut conn = AsyncPgConnection::establish(test_db.url()).await?; // Set up a user and a token that is about to expire. - let (user, mut conn) = spawn_blocking(move || { + let mut sync_conn = test_db.connect(); + let user = spawn_blocking(move || { let user = NewUser::new(0, "a", None, None, "token"); let emails = Emails::new_in_memory(); - let user = user.create_or_update(Some("testuser@test.com"), &emails, &mut conn)?; - Ok::<_, anyhow::Error>((user, conn)) + let user = user.create_or_update(Some("testuser@test.com"), &emails, &mut sync_conn)?; + Ok::<_, anyhow::Error>(user) }) .await?; @@ -202,7 +201,8 @@ mod tests { api_tokens::expired_at.eq(now.nullable() + (EXPIRY_THRESHOLD.num_days() - 1).day()), )) .returning(ApiToken::as_returning()) - .get_result(&mut conn)?; + .get_result(&mut conn) + .await?; // Insert a few tokens that are not set to expire. let not_expired_offset = EXPIRY_THRESHOLD.num_days() + 1; @@ -216,18 +216,12 @@ mod tests { api_tokens::expired_at.eq(now.nullable() + not_expired_offset.day()), )) .returning(ApiToken::as_returning()) - .get_result(&mut conn)?; + .get_result(&mut conn) + .await?; } // Check that the token is about to expire. - let mut conn = spawn_blocking({ - let emails = emails.clone(); - move || { - check(&emails, &mut conn)?; - Ok::<_, anyhow::Error>(conn) - } - }) - .await?; + check(&emails, &mut conn).await?; // Check that an email was sent. let sent_mail = emails.mails_in_memory().await.unwrap(); @@ -241,7 +235,8 @@ mod tests { .filter(api_tokens::id.eq(token.id)) .filter(api_tokens::expiry_notification_at.is_not_null()) .select(ApiToken::as_select()) - .first::(&mut conn)?; + .first::(&mut conn) + .await?; assert_eq!(updated_token.name, "test_token".to_owned()); // Check that the token is not about to expire. @@ -249,7 +244,8 @@ mod tests { .filter(api_tokens::revoked.eq(false)) .filter(api_tokens::expiry_notification_at.is_null()) .select(ApiToken::as_select()) - .load::(&mut conn)?; + .load::(&mut conn) + .await?; assert_eq!(tokens.len(), 3); // Insert a already expired token. @@ -262,14 +258,11 @@ mod tests { api_tokens::expired_at.eq(now.nullable() - 1.day()), )) .returning(ApiToken::as_returning()) - .get_result(&mut conn)?; + .get_result(&mut conn) + .await?; // Check that the token is not about to expire. - spawn_blocking({ - let emails = emails.clone(); - move || check(&emails, &mut conn) - }) - .await?; + check(&emails, &mut conn).await?; // Check that no email was sent. let sent_mail = emails.mails_in_memory().await.unwrap();