Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions src/worker/jobs/rss/sync_crate_feed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::schema::{crates, versions};
use crate::storage::FeedId;
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::worker::Environment;
use chrono::Duration;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use std::sync::Arc;

/// Items younger than this will always be included in the feed.
Expand Down Expand Up @@ -42,16 +40,9 @@ impl BackgroundJob for SyncCrateFeed {
let domain = &ctx.config.domain_name;

info!("Loading latest {NUM_ITEMS} version updates for `{name}` from the database…");
let conn = ctx.deadpool.get().await?;

let version_updates = spawn_blocking({
let name = name.clone();
move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Ok::<_, anyhow::Error>(load_version_updates(&name, conn)?)
}
})
.await?;
let mut conn = ctx.deadpool.get().await?;

let version_updates = load_version_updates(name, &mut conn).await?;

let feed_id = FeedId::Crate { name };

Expand Down Expand Up @@ -102,7 +93,10 @@ impl BackgroundJob for SyncCrateFeed {
/// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] versions
/// then the list will be padded with older versions until [`NUM_ITEMS`] are
/// returned.
fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult<Vec<VersionUpdate>> {
async fn load_version_updates(
name: &str,
conn: &mut AsyncPgConnection,
) -> QueryResult<Vec<VersionUpdate>> {
let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE;

let updates = versions::table
Expand All @@ -111,7 +105,8 @@ fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult<Vec<Ver
.filter(versions::created_at.gt(threshold_dt))
.order(versions::created_at.desc())
.select(VersionUpdate::as_select())
.load(conn)?;
.load(conn)
.await?;

let num_updates = updates.len();
if num_updates as i64 >= NUM_ITEMS {
Expand All @@ -125,6 +120,7 @@ fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult<Vec<Ver
.select(VersionUpdate::as_select())
.limit(NUM_ITEMS)
.load(conn)
.await
}

#[derive(Debug, Queryable, Selectable)]
Expand Down Expand Up @@ -183,65 +179,67 @@ mod tests {
use super::*;
use chrono::NaiveDateTime;
use crates_io_test_db::TestDatabase;
use diesel_async::AsyncConnection;
use insta::assert_debug_snapshot;

#[test]
fn test_load_version_updates() {
#[tokio::test]
async fn test_load_version_updates() {
crate::util::tracing::init_for_test();

let db = TestDatabase::new();
let mut conn = db.connect();
let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap();

let now = chrono::Utc::now().naive_utc();

let updates = assert_ok!(load_version_updates("foo", &mut conn));
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
assert_eq!(updates.len(), 0);

let foo = create_crate(&mut conn, "foo");
let foo = create_crate(&mut conn, "foo").await;

// If there are less than NUM_ITEMS versions, they should all be returned
create_version(&mut conn, foo, "1.0.0", now - Duration::days(123));
create_version(&mut conn, foo, "1.0.1", now - Duration::days(110));
create_version(&mut conn, foo, "1.1.0", now - Duration::days(100));
create_version(&mut conn, foo, "1.2.0", now - Duration::days(90));
create_version(&mut conn, foo, "1.0.0", now - Duration::days(123)).await;
create_version(&mut conn, foo, "1.0.1", now - Duration::days(110)).await;
create_version(&mut conn, foo, "1.1.0", now - Duration::days(100)).await;
create_version(&mut conn, foo, "1.2.0", now - Duration::days(90)).await;

let updates = assert_ok!(load_version_updates("foo", &mut conn));
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
assert_eq!(updates.len(), 4);
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());

// If there are more than NUM_ITEMS versions, only the most recent NUM_ITEMS should be returned
for i in 1..=NUM_ITEMS {
let version = format!("1.2.{i}");
let publish_time = now - Duration::days(90) + Duration::hours(i);
create_version(&mut conn, foo, &version, publish_time);
create_version(&mut conn, foo, &version, publish_time).await;
}

let updates = assert_ok!(load_version_updates("foo", &mut conn));
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
assert_eq!(updates.len() as i64, NUM_ITEMS);
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());

// But if there are more than NUM_ITEMS versions that are younger than ALWAYS_INCLUDE_AGE, all of them should be returned
for i in 1..=(NUM_ITEMS + 10) {
let version = format!("1.3.{i}");
let publish_time = now - Duration::minutes(30) + Duration::seconds(i);
create_version(&mut conn, foo, &version, publish_time);
create_version(&mut conn, foo, &version, publish_time).await;
}

let updates = assert_ok!(load_version_updates("foo", &mut conn));
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
assert_eq!(updates.len() as i64, NUM_ITEMS + 10);
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
}

fn create_crate(conn: &mut impl Conn, name: &str) -> i32 {
async fn create_crate(conn: &mut AsyncPgConnection, name: &str) -> i32 {
diesel::insert_into(crates::table)
.values((crates::name.eq(name),))
.returning(crates::id)
.get_result(conn)
.await
.unwrap()
}

fn create_version(
conn: &mut impl Conn,
async fn create_version(
conn: &mut AsyncPgConnection,
crate_id: i32,
version: &str,
publish_time: NaiveDateTime,
Expand All @@ -256,6 +254,7 @@ mod tests {
))
.returning(versions::id)
.get_result(conn)
.await
.unwrap()
}
}
52 changes: 27 additions & 25 deletions src/worker/jobs/rss/sync_crates_feed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::schema::crates;
use crate::storage::FeedId;
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::worker::Environment;
use chrono::Duration;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use std::sync::Arc;

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -34,12 +32,8 @@ impl BackgroundJob for SyncCratesFeed {
let domain = &ctx.config.domain_name;

info!("Loading latest {NUM_ITEMS} crates from the database…");
let conn = ctx.deadpool.get().await?;
let new_crates = spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Ok::<_, anyhow::Error>(load_new_crates(conn)?)
})
.await?;
let mut conn = ctx.deadpool.get().await?;
let new_crates = load_new_crates(&mut conn).await?;

let link = rss::extension::atom::Link {
href: ctx.storage.feed_url(&feed_id),
Expand Down Expand Up @@ -86,14 +80,15 @@ impl BackgroundJob for SyncCratesFeed {
/// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] crates
/// then the list will be padded with older crates until [`NUM_ITEMS`] are
/// returned.
fn load_new_crates(conn: &mut impl Conn) -> QueryResult<Vec<NewCrate>> {
async fn load_new_crates(conn: &mut AsyncPgConnection) -> QueryResult<Vec<NewCrate>> {
let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE;

let new_crates = crates::table
.filter(crates::created_at.gt(threshold_dt))
.order(crates::created_at.desc())
.select(NewCrate::as_select())
.load(conn)?;
.load(conn)
.await?;

let num_new_crates = new_crates.len();
if num_new_crates as i64 >= NUM_ITEMS {
Expand All @@ -105,6 +100,7 @@ fn load_new_crates(conn: &mut impl Conn) -> QueryResult<Vec<NewCrate>> {
.select(NewCrate::as_select())
.limit(NUM_ITEMS)
.load(conn)
.await
}

#[derive(Debug, Queryable, Selectable)]
Expand Down Expand Up @@ -161,54 +157,59 @@ mod tests {
use super::*;
use chrono::NaiveDateTime;
use crates_io_test_db::TestDatabase;
use diesel_async::{AsyncConnection, AsyncPgConnection};
use insta::assert_debug_snapshot;

#[test]
fn test_load_version_updates() {
#[tokio::test]
async fn test_load_version_updates() {
crate::util::tracing::init_for_test();

let db = TestDatabase::new();
let mut conn = db.connect();
let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap();

let now = chrono::Utc::now().naive_utc();

let new_crates = assert_ok!(load_new_crates(&mut conn));
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
assert_eq!(new_crates.len(), 0);

// If there are less than NUM_ITEMS crates, they should all be returned
create_crate(&mut conn, "foo", now - Duration::days(123));
create_crate(&mut conn, "bar", now - Duration::days(110));
create_crate(&mut conn, "baz", now - Duration::days(100));
create_crate(&mut conn, "qux", now - Duration::days(90));
create_crate(&mut conn, "foo", now - Duration::days(123)).await;
create_crate(&mut conn, "bar", now - Duration::days(110)).await;
create_crate(&mut conn, "baz", now - Duration::days(100)).await;
create_crate(&mut conn, "qux", now - Duration::days(90)).await;

let new_crates = assert_ok!(load_new_crates(&mut conn));
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
assert_eq!(new_crates.len(), 4);
assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::<Vec<_>>());

// If there are more than NUM_ITEMS crates, only the most recent NUM_ITEMS should be returned
for i in 1..=NUM_ITEMS {
let name = format!("crate-{i}");
let publish_time = now - Duration::days(90) + Duration::hours(i);
create_crate(&mut conn, &name, publish_time);
create_crate(&mut conn, &name, publish_time).await;
}

let new_crates = assert_ok!(load_new_crates(&mut conn));
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
assert_eq!(new_crates.len() as i64, NUM_ITEMS);
assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::<Vec<_>>());

// But if there are more than NUM_ITEMS crates that are younger than ALWAYS_INCLUDE_AGE, all of them should be returned
for i in 1..=(NUM_ITEMS + 10) {
let name = format!("other-crate-{i}");
let publish_time = now - Duration::minutes(30) + Duration::seconds(i);
create_crate(&mut conn, &name, publish_time);
create_crate(&mut conn, &name, publish_time).await;
}

let new_crates = assert_ok!(load_new_crates(&mut conn));
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
assert_eq!(new_crates.len() as i64, NUM_ITEMS + 10);
assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::<Vec<_>>());
}

fn create_crate(conn: &mut impl Conn, name: &str, publish_time: NaiveDateTime) -> i32 {
async fn create_crate(
conn: &mut AsyncPgConnection,
name: &str,
publish_time: NaiveDateTime,
) -> i32 {
diesel::insert_into(crates::table)
.values((
crates::name.eq(name),
Expand All @@ -217,6 +218,7 @@ mod tests {
))
.returning(crates::id)
.get_result(conn)
.await
.unwrap()
}
}
Loading