Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/typosquat/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use diesel_async::AsyncPgConnection;
use std::sync::Arc;

use crate::util::diesel::Conn;
use thiserror::Error;
use typomania::{
checks::{Bitflips, Omitted, SwappedWords, Typos},
Expand Down Expand Up @@ -28,7 +27,7 @@ impl Cache {
/// addresses to send notifications to, then invokes [`Cache::new`] to read popular crates from
/// the database.
#[instrument(skip_all, err)]
pub fn from_env(conn: &mut impl Conn) -> Result<Self, Error> {
pub async fn from_env(conn: &mut AsyncPgConnection) -> Result<Self, Error> {
let emails: Vec<String> = crates_io_env_vars::var(NOTIFICATION_EMAILS_ENV)
.map_err(|e| Error::Environment {
name: NOTIFICATION_EMAILS_ENV.into(),
Expand All @@ -49,15 +48,15 @@ impl Cache {
})
} else {
// Otherwise, let's go get the top crates and build a corpus.
Self::new(emails, conn)
Self::new(emails, conn).await
}
}

/// Instantiates a cache by querying popular crates and building them into a typomania harness.
///
/// This relies on configuration in the `super::config` module.
pub fn new(emails: Vec<String>, conn: &mut impl Conn) -> Result<Self, Error> {
let top = TopCrates::new(conn, config::TOP_CRATES)?;
pub async fn new(emails: Vec<String>, conn: &mut AsyncPgConnection) -> Result<Self, Error> {
let top = TopCrates::new(conn, config::TOP_CRATES).await?;

Ok(Self {
emails,
Expand Down
85 changes: 50 additions & 35 deletions src/typosquat/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
collections::{BTreeMap, HashMap, HashSet},
};

use crate::util::diesel::Conn;
use diesel::{connection::DefaultLoadingMode, QueryResult};
use crate::util::diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures_util::TryStreamExt;
use typomania::{AuthorSet, Corpus, Package};

/// A corpus of the current top crates on crates.io, as determined by their download counts, along
Expand All @@ -18,12 +19,11 @@

impl TopCrates {
/// Retrieves the `num` top crates from the database.
pub fn new(conn: &mut impl Conn, num: i64) -> QueryResult<Self> {
pub async fn new(conn: &mut AsyncPgConnection, num: i64) -> QueryResult<Self> {
use crate::{
models,
schema::{crate_downloads, crate_owners},
};
use diesel::prelude::*;

// We have to build up a data structure that contains the top crates, their owners in some
// form that is easily compared, and that can be indexed by the crate name.
Expand All @@ -42,45 +42,51 @@
// Once we have the results of those queries, we can glom it all together into one happy
// data structure.

let mut crates: BTreeMap<i32, (String, Crate)> = BTreeMap::new();
for result in models::Crate::all()
let crates: BTreeMap<i32, (String, Crate)> = BTreeMap::new();
let crates = models::Crate::all()
.inner_join(crate_downloads::table)
.order(crate_downloads::downloads.desc())
.limit(num)
.load_iter::<models::Crate, DefaultLoadingMode>(conn)?
{
let krate = result?;
crates.insert(
krate.id,
(
krate.name,
Crate {
owners: HashSet::new(),
},
),
);
}
.load_stream::<models::Crate>(conn)
.await?
.try_fold(crates, |mut crates, krate| {
crates.insert(
krate.id,
(
krate.name,
Crate {
owners: HashSet::new(),
},
),
);

futures_util::future::ready(Ok(crates))
})
.await?;

Check warning on line 65 in src/typosquat/database.rs

View check run for this annotation

Codecov / codecov/patch

src/typosquat/database.rs#L65

Added line #L65 was not covered by tests
Comment on lines +46 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just straight up better, async or no. 👍


// This query might require more low level knowledge of crate_owners than we really want
// outside of the models module. It would probably make more sense in the long term to have
// this live in the Owner type, but for now I want to keep the typosquatting logic as
// self-contained as possible in case we decide not to go ahead with this in the longer
// term.
for result in crate_owners::table
let crates = crate_owners::table
.filter(crate_owners::deleted.eq(false))
.filter(crate_owners::crate_id.eq_any(crates.keys().cloned().collect::<Vec<_>>()))
.select((
crate_owners::crate_id,
crate_owners::owner_id,
crate_owners::owner_kind,
))
.load_iter::<(i32, i32, i32), DefaultLoadingMode>(conn)?
{
let (crate_id, owner_id, owner_kind) = result?;
crates.entry(crate_id).and_modify(|(_name, krate)| {
krate.owners.insert(Owner::new(owner_id, owner_kind));
});
}
.load_stream::<(i32, i32, i32)>(conn)
.await?
.try_fold(crates, |mut crates, (crate_id, owner_id, owner_kind)| {
crates.entry(crate_id).and_modify(|(_name, krate)| {
krate.owners.insert(Owner::new(owner_id, owner_kind));
});

futures_util::future::ready(Ok(crates))
})
.await?;

Check warning on line 89 in src/typosquat/database.rs

View check run for this annotation

Codecov / codecov/patch

src/typosquat/database.rs#L89

Added line #L89 was not covered by tests

Ok(Self {
crates: crates.into_values().collect(),
Expand All @@ -104,12 +110,16 @@

impl Crate {
/// Hydrates a crate and its owners from the database given the crate name.
pub fn from_name(conn: &mut impl Conn, name: &str) -> QueryResult<Self> {
pub async fn from_name(conn: &mut AsyncPgConnection, name: &str) -> QueryResult<Self> {
use crate::models;
use diesel::prelude::*;

let krate = models::Crate::by_exact_name(name).first(conn)?;
let owners = krate.owners(conn)?.into_iter().map(Owner::from).collect();
let krate = models::Crate::by_exact_name(name).first(conn).await?;
let owners = krate
.async_owners(conn)
.await?
.into_iter()
.map(Owner::from)
.collect();

Ok(Self { owners })
}
Expand Down Expand Up @@ -166,10 +176,11 @@
use super::*;
use crate::typosquat::test_util::faker;
use crates_io_test_db::TestDatabase;
use diesel_async::AsyncConnection;
use thiserror::Error;

#[test]
fn top_crates() -> Result<(), Error> {
#[tokio::test]
async fn top_crates() -> Result<(), Error> {
let test_db = TestDatabase::new();
let mut conn = test_db.connect();

Expand All @@ -187,7 +198,8 @@
faker::add_crate_to_team(&mut conn, &user_b, &top_b, &not_the_a_team)?;
faker::add_crate_to_team(&mut conn, &user_b, &not_top_c, &not_the_a_team)?;

let top_crates = TopCrates::new(&mut conn, 2)?;
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?;
let top_crates = TopCrates::new(&mut async_conn, 2).await?;

// Let's ensure the top crates include what we expect (which is a and b, since we asked for
// 2 crates and they're the most downloaded).
Expand All @@ -201,7 +213,7 @@
assert!(!pkg_a.shared_authors(pkg_b.authors()));

// Now let's go get package c and pretend it's a new package.
let pkg_c = Crate::from_name(&mut conn, "c")?;
let pkg_c = Crate::from_name(&mut async_conn, "c").await?;

// c _does_ have an author in common with a.
assert!(pkg_a.shared_authors(pkg_c.authors()));
Expand All @@ -227,5 +239,8 @@

#[error(transparent)]
Diesel(#[from] diesel::result::Error),

#[error(transparent)]
Connection(#[from] ConnectionError),
}
}
11 changes: 6 additions & 5 deletions src/worker/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use crate::fastly::Fastly;
use crate::storage::Storage;
use crate::typosquat;
use crate::util::diesel::Conn;
use crate::Emails;
use anyhow::Context;
use bon::Builder;
Expand All @@ -13,8 +12,9 @@
use object_store::ObjectStore;
use parking_lot::{Mutex, MutexGuard};
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, OnceLock};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::OnceCell;

#[derive(Builder)]
pub struct Environment {
Expand All @@ -33,7 +33,7 @@

/// A lazily initialised cache of the most popular crates ready to use in typosquatting checks.
#[builder(skip)]
typosquat_cache: OnceLock<Result<typosquat::Cache, typosquat::CacheError>>,
typosquat_cache: OnceCell<Result<typosquat::Cache, typosquat::CacheError>>,
}

impl Environment {
Expand Down Expand Up @@ -78,9 +78,9 @@
}

/// Returns the typosquatting cache, initialising it if required.
pub(crate) fn typosquat_cache(
pub(crate) async fn typosquat_cache(
&self,
conn: &mut impl Conn,
conn: &mut AsyncPgConnection,
) -> Result<&typosquat::Cache, typosquat::CacheError> {
// We have to pass conn back in here because the caller might be in a transaction, and
// getting a new connection here to query crates can result in a deadlock.
Expand All @@ -90,6 +90,7 @@
// generated if initialising the cache fails.
self.typosquat_cache
.get_or_init(|| typosquat::Cache::from_env(conn))
.await

Check warning on line 93 in src/worker/environment.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/environment.rs#L93

Added line #L93 was not covered by tests
.as_ref()
.map_err(|e| e.clone())
}
Expand Down
46 changes: 17 additions & 29 deletions src/worker/jobs/typosquat.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use crates_io_worker::BackgroundJob;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::AsyncPgConnection;
use typomania::Package;

use crate::email::Email;
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::{
typosquat::{Cache, Crate},
worker::Environment,
Expand Down Expand Up @@ -36,22 +34,23 @@ impl BackgroundJob for CheckTyposquat {
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
let crate_name = self.name.clone();

let conn = env.deadpool.get().await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
let mut conn = env.deadpool.get().await?;

let cache = env.typosquat_cache(conn)?;
check(&env.emails, cache, conn, &crate_name)
})
.await
let cache = env.typosquat_cache(&mut conn).await?;
check(&env.emails, cache, &mut conn, &crate_name).await
}
}

fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> anyhow::Result<()> {
async fn check(
emails: &Emails,
cache: &Cache,
conn: &mut AsyncPgConnection,
name: &str,
) -> anyhow::Result<()> {
if let Some(harness) = cache.get_harness() {
info!(name, "Checking new crate for potential typosquatting");

let krate: Box<dyn Package> = Box::new(Crate::from_name(conn, name)?);
let krate: Box<dyn Package> = Box::new(Crate::from_name(conn, name).await?);
let squats = harness.check_package(name, krate)?;
if !squats.is_empty() {
// Well, well, well. For now, the only action we'll take is to e-mail people who
Expand All @@ -65,7 +64,7 @@ fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> an
};

for recipient in cache.iter_emails() {
if let Err(error) = emails.send(recipient, email.clone()) {
if let Err(error) = emails.async_send(recipient, email.clone()).await {
error!(
?error,
?recipient,
Expand Down Expand Up @@ -125,6 +124,7 @@ mod tests {
use super::*;
use crate::typosquat::test_util::faker;
use crates_io_test_db::TestDatabase;
use diesel_async::AsyncConnection;
use lettre::Address;

#[tokio::test]
Expand All @@ -138,7 +138,8 @@ mod tests {
faker::crate_and_version(&mut conn, "my-crate", "It's awesome", &user, 100)?;

// Prime the cache so it only includes the crate we just created.
let cache = Cache::new(vec!["admin@example.com".to_string()], &mut conn)?;
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?;
let cache = Cache::new(vec!["admin@example.com".to_string()], &mut async_conn).await?;
let cache = Arc::new(cache);

// Now we'll create new crates: one problematic, one not so.
Expand All @@ -159,24 +160,11 @@ mod tests {
)?;

// Run the check with a crate that shouldn't cause problems.
let mut conn = spawn_blocking({
let emails = emails.clone();
let cache = cache.clone();
move || {
check(&emails, &cache, &mut conn, &angel.name)?;
Ok::<_, anyhow::Error>(conn)
}
})
.await?;
check(&emails, &cache, &mut async_conn, &angel.name).await?;
assert!(emails.mails_in_memory().await.unwrap().is_empty());

// Now run the check with a less innocent crate.
spawn_blocking({
let emails = emails.clone();
let cache = cache.clone();
move || check(&emails, &cache, &mut conn, &demon.name)
})
.await?;
check(&emails, &cache, &mut async_conn, &demon.name).await?;
let sent_mail = emails.mails_in_memory().await.unwrap();
assert!(!sent_mail.is_empty());
let sent = sent_mail.into_iter().next().unwrap();
Expand Down