Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 29 additions & 30 deletions src/worker/jobs/downloads/clean_processed_log_files.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::schema::processed_log_files;
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::worker::Environment;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use std::sync::Arc;

/// This job is responsible for cleaning up old entries in the
Expand All @@ -22,18 +20,17 @@ impl BackgroundJob for CleanProcessedLogFiles {
type Context = Arc<Environment>;

async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
let conn = env.deadpool.get().await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Ok(run(conn)?)
})
.await
let mut conn = env.deadpool.get().await?;
Ok(run(&mut conn).await?)
}
}

fn run(conn: &mut impl Conn) -> QueryResult<()> {
async fn run(conn: &mut AsyncPgConnection) -> QueryResult<()> {
let filter = processed_log_files::time.lt(cut_off_date());
diesel::delete(processed_log_files::table.filter(filter)).execute(conn)?;

diesel::delete(processed_log_files::table.filter(filter))
.execute(conn)
.await?;

Ok(())
}
Expand All @@ -45,29 +42,29 @@ fn cut_off_date() -> chrono::DateTime<chrono::Utc> {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::test_db_connection;
use chrono::{DateTime, Utc};
use crates_io_test_db::TestDatabase;
use diesel_async::{AsyncConnection, AsyncPgConnection};
use insta::assert_debug_snapshot;

#[test]
fn test_cleanup() {
let (_test_db, conn) = &mut test_db_connection();
#[tokio::test]
async fn test_cleanup() {
let test_db = TestDatabase::new();
let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();

let now = chrono::Utc::now();
let cut_off_date = cut_off_date();
let one_hour = chrono::Duration::try_hours(1).unwrap();

insert(
conn,
vec![
("very-old-file", cut_off_date - one_hour * 30 * 24),
("old-file", cut_off_date - one_hour),
("newish-file", cut_off_date + one_hour),
("brand-new-file", now),
("future-file", now + one_hour * 7 * 24),
],
);
assert_debug_snapshot!(paths_in_table(conn), @r###"
let inserts = vec![
("very-old-file", cut_off_date - one_hour * 30 * 24),
("old-file", cut_off_date - one_hour),
("newish-file", cut_off_date + one_hour),
("brand-new-file", now),
("future-file", now + one_hour * 7 * 24),
];
insert(&mut conn, inserts).await;
assert_debug_snapshot!(paths_in_table(&mut conn).await, @r###"
[
"very-old-file",
"old-file",
Expand All @@ -77,8 +74,8 @@ mod tests {
]
"###);

run(conn).unwrap();
assert_debug_snapshot!(paths_in_table(conn), @r###"
run(&mut conn).await.unwrap();
assert_debug_snapshot!(paths_in_table(&mut conn).await, @r###"
[
"newish-file",
"brand-new-file",
Expand All @@ -88,7 +85,7 @@ mod tests {
}

/// Insert a list of paths and times into the `processed_log_files` table.
fn insert(conn: &mut PgConnection, inserts: Vec<(&str, DateTime<Utc>)>) {
async fn insert(conn: &mut AsyncPgConnection, inserts: Vec<(&str, DateTime<Utc>)>) {
let inserts = inserts
.into_iter()
.map(|(path, time)| {
Expand All @@ -102,14 +99,16 @@ mod tests {
diesel::insert_into(processed_log_files::table)
.values(&inserts)
.execute(conn)
.await
.unwrap();
}

/// Read all paths from the `processed_log_files` table.
fn paths_in_table(conn: &mut PgConnection) -> Vec<String> {
async fn paths_in_table(conn: &mut AsyncPgConnection) -> Vec<String> {
processed_log_files::table
.select(processed_log_files::path)
.load::<String>(conn)
.await
.unwrap()
}
}
115 changes: 60 additions & 55 deletions src/worker/jobs/downloads/process_log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::config::CdnLogStorageConfig;
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::worker::Environment;
use anyhow::Context;
use chrono::NaiveDate;
Expand All @@ -9,9 +7,9 @@ use crates_io_worker::BackgroundJob;
use diesel::dsl::exists;
use diesel::prelude::*;
use diesel::{select, QueryResult};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::AsyncPgConnection;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
Expand Down Expand Up @@ -123,11 +121,9 @@ async fn run(
log_stats(&downloads);

let path = path.to_string();
let conn = db_pool.get().await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();

conn.transaction(|conn| {
let mut conn = db_pool.get().await?;
conn.transaction(|conn| {
async move {
// Mark the log file as processed before saving the downloads to
// the database.
//
Expand All @@ -138,14 +134,15 @@ async fn run(
// When the job is retried the `already_processed()` call above
// will return `true` and the job will skip processing the log
// file again.
save_as_processed(path, conn)?;

save_downloads(downloads, conn)
})?;
save_as_processed(path, conn).await?;

Ok::<_, anyhow::Error>(())
save_downloads(downloads, conn).await
}
.scope_boxed()
})
.await
.await?;

Ok(())
}

/// Loads the given log file from the object store and counts the number of
Expand Down Expand Up @@ -219,15 +216,23 @@ impl From<(String, Version, NaiveDate, u64)> for NewDownload {
/// The temporary table only exists on the current connection, but if a
/// connection pool is used, the temporary table will not be dropped when
/// the connection is returned to the pool.
pub fn save_downloads(downloads: DownloadsMap, conn: &mut impl Conn) -> anyhow::Result<()> {
pub async fn save_downloads(
downloads: DownloadsMap,
conn: &mut AsyncPgConnection,
) -> anyhow::Result<()> {
debug!("Creating temp_downloads table");
create_temp_downloads_table(conn).context("Failed to create temp_downloads table")?;
create_temp_downloads_table(conn)
.await
.context("Failed to create temp_downloads table")?;

debug!("Saving counted downloads to temp_downloads table");
fill_temp_downloads_table(downloads, conn).context("Failed to fill temp_downloads table")?;
fill_temp_downloads_table(downloads, conn)
.await
.context("Failed to fill temp_downloads table")?;

debug!("Saving temp_downloads to version_downloads table");
let failed_inserts = save_to_version_downloads(conn)
.await
.context("Failed to save temp_downloads to version_downloads table")?;

if !failed_inserts.is_empty() {
Expand All @@ -247,7 +252,7 @@ pub fn save_downloads(downloads: DownloadsMap, conn: &mut impl Conn) -> anyhow::
/// look up the `version_id` for each crate and version combination, and that
/// requires a join with the `crates` and `versions` tables.
#[instrument("db.query", skip_all, fields(message = "CREATE TEMPORARY TABLE ..."))]
fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult<usize> {
async fn create_temp_downloads_table(conn: &mut AsyncPgConnection) -> QueryResult<usize> {
diesel::sql_query(
r#"
CREATE TEMPORARY TABLE temp_downloads (
Expand All @@ -259,6 +264,7 @@ fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult<usize> {
"#,
)
.execute(conn)
.await
}

/// Fills the temporary `temp_downloads` table with the downloads from the
Expand All @@ -268,7 +274,10 @@ fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult<usize> {
skip_all,
fields(message = "INSERT INTO temp_downloads ...")
)]
fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> QueryResult<()> {
async fn fill_temp_downloads_table(
downloads: DownloadsMap,
conn: &mut AsyncPgConnection,
) -> QueryResult<()> {
// `tokio-postgres` has a limit on the size of values it can send to the
// database. To avoid hitting this limit, we insert the downloads in
// batches.
Expand All @@ -283,7 +292,8 @@ fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> Q
for chunk in map.chunks(MAX_BATCH_SIZE) {
diesel::insert_into(temp_downloads::table)
.values(chunk)
.execute(conn)?;
.execute(conn)
.await?;
}

Ok(())
Expand All @@ -297,7 +307,9 @@ fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> Q
skip_all,
fields(message = "INSERT INTO version_downloads ...")
)]
fn save_to_version_downloads(conn: &mut impl Conn) -> QueryResult<Vec<NameAndVersion>> {
async fn save_to_version_downloads(
conn: &mut AsyncPgConnection,
) -> QueryResult<Vec<NameAndVersion>> {
diesel::sql_query(
r#"
WITH joined_data AS (
Expand All @@ -319,7 +331,7 @@ fn save_to_version_downloads(conn: &mut impl Conn) -> QueryResult<Vec<NameAndVer
WHERE joined_data.id IS NULL;
"#,
)
.load(conn)
.load(conn).await
}

table! {
Expand Down Expand Up @@ -358,12 +370,8 @@ async fn already_processed(
) -> anyhow::Result<bool> {
let path = path.into();

let conn = db_pool.get().await?;
let already_processed = spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Ok::<_, anyhow::Error>(already_processed_inner(path, conn)?)
})
.await?;
let mut conn = db_pool.get().await?;
let already_processed = already_processed_inner(path, &mut conn).await?;

Ok(already_processed)
}
Expand All @@ -374,21 +382,28 @@ async fn already_processed(
/// Note that if a second job is already processing the same log file, this
/// function will return `false` because the second job will not have inserted
/// the path into the `processed_log_files` table yet.
fn already_processed_inner(path: impl Into<String>, conn: &mut impl Conn) -> QueryResult<bool> {
async fn already_processed_inner(
path: impl Into<String>,
conn: &mut AsyncPgConnection,
) -> QueryResult<bool> {
use crate::schema::processed_log_files;

let query = processed_log_files::table.filter(processed_log_files::path.eq(path.into()));
select(exists(query)).get_result(conn)
select(exists(query)).get_result(conn).await
}

/// Inserts the given path into the `processed_log_files` table to mark it as
/// processed.
fn save_as_processed(path: impl Into<String>, conn: &mut impl Conn) -> QueryResult<()> {
async fn save_as_processed(
path: impl Into<String>,
conn: &mut AsyncPgConnection,
) -> QueryResult<()> {
use crate::schema::processed_log_files;

diesel::insert_into(processed_log_files::table)
.values(processed_log_files::path.eq(path.into()))
.execute(conn)?;
.execute(conn)
.await?;

Ok(())
}
Expand All @@ -397,7 +412,6 @@ fn save_as_processed(path: impl Into<String>, conn: &mut impl Conn) -> QueryResu
mod tests {
use super::*;
use crate::schema::{crates, version_downloads, versions};
use crate::util::diesel::Conn;
use crates_io_test_db::TestDatabase;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use insta::assert_debug_snapshot;
Expand Down Expand Up @@ -485,26 +499,20 @@ mod tests {

/// Inserts some dummy crates and versions into the database.
async fn create_dummy_crates_and_versions(db_pool: Pool<AsyncPgConnection>) {
let conn = db_pool.get().await.unwrap();
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();

create_crate_and_version("bindgen", "0.65.1", conn);
create_crate_and_version("tracing-core", "0.1.32", conn);
create_crate_and_version("quick-error", "1.2.3", conn);
let mut conn = db_pool.get().await.unwrap();

Ok::<_, anyhow::Error>(())
})
.await
.unwrap();
create_crate_and_version("bindgen", "0.65.1", &mut conn).await;
create_crate_and_version("tracing-core", "0.1.32", &mut conn).await;
create_crate_and_version("quick-error", "1.2.3", &mut conn).await;
}

/// Inserts a dummy crate and version into the database.
fn create_crate_and_version(name: &str, version: &str, conn: &mut impl Conn) {
async fn create_crate_and_version(name: &str, version: &str, conn: &mut AsyncPgConnection) {
let crate_id: i32 = diesel::insert_into(crates::table)
.values(crates::name.eq(name))
.returning(crates::id)
.get_result(conn)
.await
.unwrap();

diesel::insert_into(versions::table)
Expand All @@ -515,19 +523,15 @@ mod tests {
versions::checksum.eq("checksum"),
))
.execute(conn)
.await
.unwrap();
}

/// Queries all version downloads from the database and returns them as a
/// [`Vec`] of strings for use with [`assert_debug_snapshot!()`].
async fn all_version_downloads(db_pool: Pool<AsyncPgConnection>) -> Vec<String> {
let conn = db_pool.get().await.unwrap();
let downloads = spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Ok::<_, anyhow::Error>(query_all_version_downloads(conn))
})
.await
.unwrap();
let mut conn = db_pool.get().await.unwrap();
let downloads = query_all_version_downloads(&mut conn).await;

downloads
.into_iter()
Expand All @@ -539,8 +543,8 @@ mod tests {

/// Queries all version downloads from the database and returns them as a
/// [`Vec`] of tuples.
fn query_all_version_downloads(
conn: &mut impl Conn,
async fn query_all_version_downloads(
conn: &mut AsyncPgConnection,
) -> Vec<(String, String, i32, i32, NaiveDate, bool)> {
version_downloads::table
.inner_join(versions::table)
Expand All @@ -555,6 +559,7 @@ mod tests {
))
.order((crates::name, versions::num, version_downloads::date))
.load(conn)
.await
.unwrap()
}
}