From 4f0e1062a33627ccf5f61c45e8884ac1d5e1a49b Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Thu, 14 Nov 2024 17:45:23 +0100 Subject: [PATCH] typosquat: Asyncify all the things --- src/typosquat/cache.rs | 11 +++-- src/typosquat/database.rs | 85 +++++++++++++++++++++--------------- src/worker/environment.rs | 11 ++--- src/worker/jobs/typosquat.rs | 46 ++++++++----------- 4 files changed, 78 insertions(+), 75 deletions(-) diff --git a/src/typosquat/cache.rs b/src/typosquat/cache.rs index f64fe01e884..3bbbcad5fa9 100644 --- a/src/typosquat/cache.rs +++ b/src/typosquat/cache.rs @@ -1,6 +1,5 @@ +use diesel_async::AsyncPgConnection; use std::sync::Arc; - -use crate::util::diesel::Conn; use thiserror::Error; use typomania::{ checks::{Bitflips, Omitted, SwappedWords, Typos}, @@ -28,7 +27,7 @@ impl Cache { /// addresses to send notifications to, then invokes [`Cache::new`] to read popular crates from /// the database. #[instrument(skip_all, err)] - pub fn from_env(conn: &mut impl Conn) -> Result { + pub async fn from_env(conn: &mut AsyncPgConnection) -> Result { let emails: Vec = crates_io_env_vars::var(NOTIFICATION_EMAILS_ENV) .map_err(|e| Error::Environment { name: NOTIFICATION_EMAILS_ENV.into(), @@ -49,15 +48,15 @@ impl Cache { }) } else { // Otherwise, let's go get the top crates and build a corpus. - Self::new(emails, conn) + Self::new(emails, conn).await } } /// Instantiates a cache by querying popular crates and building them into a typomania harness. /// /// This relies on configuration in the `super::config` module. - pub fn new(emails: Vec, conn: &mut impl Conn) -> Result { - let top = TopCrates::new(conn, config::TOP_CRATES)?; + pub async fn new(emails: Vec, conn: &mut AsyncPgConnection) -> Result { + let top = TopCrates::new(conn, config::TOP_CRATES).await?; Ok(Self { emails, diff --git a/src/typosquat/database.rs b/src/typosquat/database.rs index 880336f4155..f3bd71e5822 100644 --- a/src/typosquat/database.rs +++ b/src/typosquat/database.rs @@ -5,8 +5,9 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, }; -use crate::util::diesel::Conn; -use diesel::{connection::DefaultLoadingMode, QueryResult}; +use crate::util::diesel::prelude::*; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; +use futures_util::TryStreamExt; use typomania::{AuthorSet, Corpus, Package}; /// A corpus of the current top crates on crates.io, as determined by their download counts, along @@ -18,12 +19,11 @@ pub struct TopCrates { impl TopCrates { /// Retrieves the `num` top crates from the database. - pub fn new(conn: &mut impl Conn, num: i64) -> QueryResult { + pub async fn new(conn: &mut AsyncPgConnection, num: i64) -> QueryResult { use crate::{ models, schema::{crate_downloads, crate_owners}, }; - use diesel::prelude::*; // We have to build up a data structure that contains the top crates, their owners in some // form that is easily compared, and that can be indexed by the crate name. @@ -42,31 +42,34 @@ impl TopCrates { // Once we have the results of those queries, we can glom it all together into one happy // data structure. - let mut crates: BTreeMap = BTreeMap::new(); - for result in models::Crate::all() + let crates: BTreeMap = BTreeMap::new(); + let crates = models::Crate::all() .inner_join(crate_downloads::table) .order(crate_downloads::downloads.desc()) .limit(num) - .load_iter::(conn)? - { - let krate = result?; - crates.insert( - krate.id, - ( - krate.name, - Crate { - owners: HashSet::new(), - }, - ), - ); - } + .load_stream::(conn) + .await? + .try_fold(crates, |mut crates, krate| { + crates.insert( + krate.id, + ( + krate.name, + Crate { + owners: HashSet::new(), + }, + ), + ); + + futures_util::future::ready(Ok(crates)) + }) + .await?; // This query might require more low level knowledge of crate_owners than we really want // outside of the models module. It would probably make more sense in the long term to have // this live in the Owner type, but for now I want to keep the typosquatting logic as // self-contained as possible in case we decide not to go ahead with this in the longer // term. - for result in crate_owners::table + let crates = crate_owners::table .filter(crate_owners::deleted.eq(false)) .filter(crate_owners::crate_id.eq_any(crates.keys().cloned().collect::>())) .select(( @@ -74,13 +77,16 @@ impl TopCrates { crate_owners::owner_id, crate_owners::owner_kind, )) - .load_iter::<(i32, i32, i32), DefaultLoadingMode>(conn)? - { - let (crate_id, owner_id, owner_kind) = result?; - crates.entry(crate_id).and_modify(|(_name, krate)| { - krate.owners.insert(Owner::new(owner_id, owner_kind)); - }); - } + .load_stream::<(i32, i32, i32)>(conn) + .await? + .try_fold(crates, |mut crates, (crate_id, owner_id, owner_kind)| { + crates.entry(crate_id).and_modify(|(_name, krate)| { + krate.owners.insert(Owner::new(owner_id, owner_kind)); + }); + + futures_util::future::ready(Ok(crates)) + }) + .await?; Ok(Self { crates: crates.into_values().collect(), @@ -104,12 +110,16 @@ pub struct Crate { impl Crate { /// Hydrates a crate and its owners from the database given the crate name. - pub fn from_name(conn: &mut impl Conn, name: &str) -> QueryResult { + pub async fn from_name(conn: &mut AsyncPgConnection, name: &str) -> QueryResult { use crate::models; - use diesel::prelude::*; - let krate = models::Crate::by_exact_name(name).first(conn)?; - let owners = krate.owners(conn)?.into_iter().map(Owner::from).collect(); + let krate = models::Crate::by_exact_name(name).first(conn).await?; + let owners = krate + .async_owners(conn) + .await? + .into_iter() + .map(Owner::from) + .collect(); Ok(Self { owners }) } @@ -166,10 +176,11 @@ mod tests { use super::*; use crate::typosquat::test_util::faker; use crates_io_test_db::TestDatabase; + use diesel_async::AsyncConnection; use thiserror::Error; - #[test] - fn top_crates() -> Result<(), Error> { + #[tokio::test] + async fn top_crates() -> Result<(), Error> { let test_db = TestDatabase::new(); let mut conn = test_db.connect(); @@ -187,7 +198,8 @@ mod tests { faker::add_crate_to_team(&mut conn, &user_b, &top_b, ¬_the_a_team)?; faker::add_crate_to_team(&mut conn, &user_b, ¬_top_c, ¬_the_a_team)?; - let top_crates = TopCrates::new(&mut conn, 2)?; + let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?; + let top_crates = TopCrates::new(&mut async_conn, 2).await?; // Let's ensure the top crates include what we expect (which is a and b, since we asked for // 2 crates and they're the most downloaded). @@ -201,7 +213,7 @@ mod tests { assert!(!pkg_a.shared_authors(pkg_b.authors())); // Now let's go get package c and pretend it's a new package. - let pkg_c = Crate::from_name(&mut conn, "c")?; + let pkg_c = Crate::from_name(&mut async_conn, "c").await?; // c _does_ have an author in common with a. assert!(pkg_a.shared_authors(pkg_c.authors())); @@ -227,5 +239,8 @@ mod tests { #[error(transparent)] Diesel(#[from] diesel::result::Error), + + #[error(transparent)] + Connection(#[from] ConnectionError), } } diff --git a/src/worker/environment.rs b/src/worker/environment.rs index 49247dafd33..b9d465db789 100644 --- a/src/worker/environment.rs +++ b/src/worker/environment.rs @@ -2,7 +2,6 @@ use crate::cloudfront::CloudFront; use crate::fastly::Fastly; use crate::storage::Storage; use crate::typosquat; -use crate::util::diesel::Conn; use crate::Emails; use anyhow::Context; use bon::Builder; @@ -13,8 +12,9 @@ use diesel_async::AsyncPgConnection; use object_store::ObjectStore; use parking_lot::{Mutex, MutexGuard}; use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use std::time::Instant; +use tokio::sync::OnceCell; #[derive(Builder)] pub struct Environment { @@ -33,7 +33,7 @@ pub struct Environment { /// A lazily initialised cache of the most popular crates ready to use in typosquatting checks. #[builder(skip)] - typosquat_cache: OnceLock>, + typosquat_cache: OnceCell>, } impl Environment { @@ -78,9 +78,9 @@ impl Environment { } /// Returns the typosquatting cache, initialising it if required. - pub(crate) fn typosquat_cache( + pub(crate) async fn typosquat_cache( &self, - conn: &mut impl Conn, + conn: &mut AsyncPgConnection, ) -> Result<&typosquat::Cache, typosquat::CacheError> { // We have to pass conn back in here because the caller might be in a transaction, and // getting a new connection here to query crates can result in a deadlock. @@ -90,6 +90,7 @@ impl Environment { // generated if initialising the cache fails. self.typosquat_cache .get_or_init(|| typosquat::Cache::from_env(conn)) + .await .as_ref() .map_err(|e| e.clone()) } diff --git a/src/worker/jobs/typosquat.rs b/src/worker/jobs/typosquat.rs index 98cdd80c3da..7423fcf7737 100644 --- a/src/worker/jobs/typosquat.rs +++ b/src/worker/jobs/typosquat.rs @@ -1,12 +1,10 @@ use std::sync::Arc; use crates_io_worker::BackgroundJob; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::AsyncPgConnection; use typomania::Package; use crate::email::Email; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::{ typosquat::{Cache, Crate}, worker::Environment, @@ -36,22 +34,23 @@ impl BackgroundJob for CheckTyposquat { async fn run(&self, env: Self::Context) -> anyhow::Result<()> { let crate_name = self.name.clone(); - let conn = env.deadpool.get().await?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); + let mut conn = env.deadpool.get().await?; - let cache = env.typosquat_cache(conn)?; - check(&env.emails, cache, conn, &crate_name) - }) - .await + let cache = env.typosquat_cache(&mut conn).await?; + check(&env.emails, cache, &mut conn, &crate_name).await } } -fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> anyhow::Result<()> { +async fn check( + emails: &Emails, + cache: &Cache, + conn: &mut AsyncPgConnection, + name: &str, +) -> anyhow::Result<()> { if let Some(harness) = cache.get_harness() { info!(name, "Checking new crate for potential typosquatting"); - let krate: Box = Box::new(Crate::from_name(conn, name)?); + let krate: Box = Box::new(Crate::from_name(conn, name).await?); let squats = harness.check_package(name, krate)?; if !squats.is_empty() { // Well, well, well. For now, the only action we'll take is to e-mail people who @@ -65,7 +64,7 @@ fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> an }; for recipient in cache.iter_emails() { - if let Err(error) = emails.send(recipient, email.clone()) { + if let Err(error) = emails.async_send(recipient, email.clone()).await { error!( ?error, ?recipient, @@ -125,6 +124,7 @@ mod tests { use super::*; use crate::typosquat::test_util::faker; use crates_io_test_db::TestDatabase; + use diesel_async::AsyncConnection; use lettre::Address; #[tokio::test] @@ -138,7 +138,8 @@ mod tests { faker::crate_and_version(&mut conn, "my-crate", "It's awesome", &user, 100)?; // Prime the cache so it only includes the crate we just created. - let cache = Cache::new(vec!["admin@example.com".to_string()], &mut conn)?; + let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?; + let cache = Cache::new(vec!["admin@example.com".to_string()], &mut async_conn).await?; let cache = Arc::new(cache); // Now we'll create new crates: one problematic, one not so. @@ -159,24 +160,11 @@ mod tests { )?; // Run the check with a crate that shouldn't cause problems. - let mut conn = spawn_blocking({ - let emails = emails.clone(); - let cache = cache.clone(); - move || { - check(&emails, &cache, &mut conn, &angel.name)?; - Ok::<_, anyhow::Error>(conn) - } - }) - .await?; + check(&emails, &cache, &mut async_conn, &angel.name).await?; assert!(emails.mails_in_memory().await.unwrap().is_empty()); // Now run the check with a less innocent crate. - spawn_blocking({ - let emails = emails.clone(); - let cache = cache.clone(); - move || check(&emails, &cache, &mut conn, &demon.name) - }) - .await?; + check(&emails, &cache, &mut async_conn, &demon.name).await?; let sent_mail = emails.mails_in_memory().await.unwrap(); assert!(!sent_mail.is_empty()); let sent = sent_mail.into_iter().next().unwrap();