Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/models/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ impl User {
.first(conn)
.optional()
}

/// Queries for the email belonging to a particular user
pub async fn async_email(&self, conn: &mut AsyncPgConnection) -> QueryResult<Option<String>> {
use diesel_async::RunQueryDsl;

Email::belonging_to(self)
.select(emails::email)
.first(conn)
.await
.optional()
}
}

/// Represents a new user record insertable to the `users` table
Expand Down
89 changes: 41 additions & 48 deletions src/worker/jobs/expiry_notification.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::models::ApiToken;
use crate::schema::api_tokens;
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::{email::Email, models::User, worker::Environment, Emails};
use chrono::SecondsFormat;
use crates_io_worker::BackgroundJob;
use diesel::dsl::now;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use std::sync::Arc;

/// The threshold for the expiry notification.
Expand All @@ -27,24 +25,20 @@ impl BackgroundJob for SendTokenExpiryNotifications {

#[instrument(skip(env), err)]
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
let conn = env.deadpool.get().await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
let mut conn = env.deadpool.get().await?;

// Check if the token is about to expire
// If the token is about to expire, trigger a notification.
check(&env.emails, conn)
})
.await
// Check if the token is about to expire
// If the token is about to expire, trigger a notification.
check(&env.emails, &mut conn).await
}
}

/// Find tokens that are about to expire and send notifications to their owners.
fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
async fn check(emails: &Emails, conn: &mut AsyncPgConnection) -> anyhow::Result<()> {
let before = chrono::Utc::now() + EXPIRY_THRESHOLD;
info!("Searching for tokens that will expire before {before}…");

let expired_tokens = find_expiring_tokens(conn, before)?;
let expired_tokens = find_expiring_tokens(conn, before).await?;
let num_tokens = expired_tokens.len();
if num_tokens == 0 {
info!("Found no tokens that will expire before {before}. Skipping expiry notifications.");
Expand All @@ -59,7 +53,7 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {

let mut success = 0;
for token in &expired_tokens {
if let Err(e) = handle_expiring_token(conn, token, emails) {
if let Err(e) = handle_expiring_token(conn, token, emails).await {
error!(?e, "Failed to handle expiring token");
} else {
success += 1;
Expand All @@ -72,16 +66,16 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
}

/// Send an email to the user associated with the token.
fn handle_expiring_token(
conn: &mut impl Conn,
async fn handle_expiring_token(
conn: &mut AsyncPgConnection,
token: &ApiToken,
emails: &Emails,
) -> Result<(), anyhow::Error> {
debug!("Looking up user {} for token {}…", token.user_id, token.id);
let user = User::find(conn, token.user_id)?;
let user = User::async_find(conn, token.user_id).await?;

debug!("Looking up email address for user {}…", user.id);
let recipient = user.email(conn)?;
let recipient = user.async_email(conn).await?;
if let Some(recipient) = recipient {
debug!("Sending expiry notification to {}…", recipient);
let email = ExpiryNotificationEmail {
Expand All @@ -90,7 +84,7 @@ fn handle_expiring_token(
token_name: &token.name,
expiry_date: token.expired_at.unwrap().and_utc(),
};
emails.send(&recipient, email)?;
emails.async_send(&recipient, email).await?;
} else {
info!(
"User {} has no email address set. Skipping expiry notification.",
Expand All @@ -102,7 +96,8 @@ fn handle_expiring_token(
debug!("Marking token {} as notified…", token.id);
diesel::update(token)
.set(api_tokens::expiry_notification_at.eq(now.nullable()))
.execute(conn)?;
.execute(conn)
.await?;

Ok(())
}
Expand All @@ -112,8 +107,8 @@ fn handle_expiring_token(
/// also ignored.
///
/// This function returns at most `MAX_ROWS` tokens.
pub fn find_expiring_tokens(
conn: &mut impl Conn,
pub async fn find_expiring_tokens(
conn: &mut AsyncPgConnection,
before: chrono::DateTime<chrono::Utc>,
) -> QueryResult<Vec<ApiToken>> {
api_tokens::table
Expand All @@ -131,6 +126,7 @@ pub fn find_expiring_tokens(
.order_by(api_tokens::expired_at.asc()) // The most urgent tokens first
.limit(MAX_ROWS)
.get_results(conn)
.await
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -171,24 +167,27 @@ The crates.io team"#,
mod tests {
use super::*;
use crate::models::NewUser;
use crate::{
models::token::ApiToken, schema::api_tokens, test_util::test_db_connection,
util::token::PlainToken,
};
use crate::tasks::spawn_blocking;
use crate::{models::token::ApiToken, schema::api_tokens, util::token::PlainToken};
use crates_io_test_db::TestDatabase;
use diesel::dsl::IntervalDsl;
use diesel_async::AsyncConnection;
use lettre::Address;

#[tokio::test]
async fn test_expiry_notification() -> anyhow::Result<()> {
let emails = Emails::new_in_memory();
let (_test_db, mut conn) = test_db_connection();

let test_db = TestDatabase::new();
let mut conn = AsyncPgConnection::establish(test_db.url()).await?;

// Set up a user and a token that is about to expire.
let (user, mut conn) = spawn_blocking(move || {
let mut sync_conn = test_db.connect();
let user = spawn_blocking(move || {
let user = NewUser::new(0, "a", None, None, "token");
let emails = Emails::new_in_memory();
let user = user.create_or_update(Some("testuser@test.com"), &emails, &mut conn)?;
Ok::<_, anyhow::Error>((user, conn))
let user = user.create_or_update(Some("testuser@test.com"), &emails, &mut sync_conn)?;
Ok::<_, anyhow::Error>(user)
})
.await?;

Expand All @@ -202,7 +201,8 @@ mod tests {
api_tokens::expired_at.eq(now.nullable() + (EXPIRY_THRESHOLD.num_days() - 1).day()),
))
.returning(ApiToken::as_returning())
.get_result(&mut conn)?;
.get_result(&mut conn)
.await?;

// Insert a few tokens that are not set to expire.
let not_expired_offset = EXPIRY_THRESHOLD.num_days() + 1;
Expand All @@ -216,18 +216,12 @@ mod tests {
api_tokens::expired_at.eq(now.nullable() + not_expired_offset.day()),
))
.returning(ApiToken::as_returning())
.get_result(&mut conn)?;
.get_result(&mut conn)
.await?;
}

// Check that the token is about to expire.
let mut conn = spawn_blocking({
let emails = emails.clone();
move || {
check(&emails, &mut conn)?;
Ok::<_, anyhow::Error>(conn)
}
})
.await?;
check(&emails, &mut conn).await?;

// Check that an email was sent.
let sent_mail = emails.mails_in_memory().await.unwrap();
Expand All @@ -241,15 +235,17 @@ mod tests {
.filter(api_tokens::id.eq(token.id))
.filter(api_tokens::expiry_notification_at.is_not_null())
.select(ApiToken::as_select())
.first::<ApiToken>(&mut conn)?;
.first::<ApiToken>(&mut conn)
.await?;
assert_eq!(updated_token.name, "test_token".to_owned());

// Check that the token is not about to expire.
let tokens = api_tokens::table
.filter(api_tokens::revoked.eq(false))
.filter(api_tokens::expiry_notification_at.is_null())
.select(ApiToken::as_select())
.load::<ApiToken>(&mut conn)?;
.load::<ApiToken>(&mut conn)
.await?;
assert_eq!(tokens.len(), 3);

// Insert a already expired token.
Expand All @@ -262,14 +258,11 @@ mod tests {
api_tokens::expired_at.eq(now.nullable() - 1.day()),
))
.returning(ApiToken::as_returning())
.get_result(&mut conn)?;
.get_result(&mut conn)
.await?;

// Check that the token is not about to expire.
spawn_blocking({
let emails = emails.clone();
move || check(&emails, &mut conn)
})
.await?;
check(&emails, &mut conn).await?;

// Check that no email was sent.
let sent_mail = emails.mails_in_memory().await.unwrap();
Expand Down