From e0110a0a6f5ead234460e59e96c25161468eb5ac Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 11 Oct 2024 16:41:53 +0200 Subject: [PATCH] worker/jobs/downloads/queue: Remove `spawn_blocking()` usage --- src/worker/jobs/downloads/queue/job.rs | 31 +++++++++++++------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/worker/jobs/downloads/queue/job.rs b/src/worker/jobs/downloads/queue/job.rs index 1402da69683..1ef0b01ecc1 100644 --- a/src/worker/jobs/downloads/queue/job.rs +++ b/src/worker/jobs/downloads/queue/job.rs @@ -1,7 +1,5 @@ use crate::config::CdnLogQueueConfig; use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl}; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::worker::jobs::ProcessCdnLog; use crate::worker::Environment; use anyhow::Context; @@ -9,7 +7,6 @@ use aws_credential_types::Credentials; use aws_sdk_sqs::config::Region; use aws_sdk_sqs::types::Message; use crates_io_worker::BackgroundJob; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::AsyncPgConnection; use std::sync::Arc; @@ -156,12 +153,9 @@ async fn process_body(body: &str, connection_pool: &Pool) -> } let conn = connection_pool.get().await; - let conn = conn.context("Failed to acquire database connection")?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - enqueue_jobs(jobs, conn) - }) - .await + let mut conn = conn.context("Failed to acquire database connection")?; + + enqueue_jobs(jobs, &mut conn).await } /// Extracts a list of [`ProcessCdnLog`] jobs from a message. @@ -207,12 +201,16 @@ fn is_ignored_path(path: &str) -> bool { path.contains("/index.staging.crates.io/") || path.contains("/index.crates.io/") } -fn enqueue_jobs(jobs: Vec, conn: &mut impl Conn) -> anyhow::Result<()> { +async fn enqueue_jobs( + jobs: Vec, + conn: &mut AsyncPgConnection, +) -> anyhow::Result<()> { for job in jobs { let path = &job.path; info!("Enqueuing processing job… ({path})"); - job.enqueue(conn) + job.async_enqueue(conn) + .await .context("Failed to enqueue processing job")?; debug!("Enqueued processing job"); @@ -230,8 +228,8 @@ mod tests { use crates_io_test_db::TestDatabase; use crates_io_worker::schema::background_jobs; use diesel::prelude::*; - use diesel::QueryDsl; use diesel_async::pooled_connection::AsyncDieselConnectionManager; + use diesel_async::RunQueryDsl; use insta::assert_snapshot; use parking_lot::Mutex; @@ -262,7 +260,7 @@ mod tests { assert_ok!(run(&queue, 100, &connection_pool).await); assert_snapshot!(deleted_handles.lock().join(","), @"123"); - assert_snapshot!(open_jobs(&mut test_database.connect()), @"us-west-1 | bucket | path"); + assert_snapshot!(open_jobs(&mut connection_pool.get().await.unwrap()).await, @"us-west-1 | bucket | path"); } #[tokio::test] @@ -310,7 +308,7 @@ mod tests { assert_ok!(run(&queue, 100, &connection_pool).await); assert_snapshot!(deleted_handles.lock().join(","), @"1,2,3,4,5,6,7,8,9,10,11"); - assert_snapshot!(open_jobs(&mut test_database.connect()), @r###" + assert_snapshot!(open_jobs(&mut connection_pool.get().await.unwrap()).await, @r###" us-west-1 | bucket | path1 us-west-1 | bucket | path2 us-west-1 | bucket | path3 @@ -358,7 +356,7 @@ mod tests { assert_ok!(run(&queue, 100, &connection_pool).await); assert_snapshot!(deleted_handles.lock().join(","), @"1"); - assert_snapshot!(open_jobs(&mut test_database.connect()), @""); + assert_snapshot!(open_jobs(&mut connection_pool.get().await.unwrap()).await, @""); } #[test] @@ -419,10 +417,11 @@ mod tests { .build() } - fn open_jobs(conn: &mut impl Conn) -> String { + async fn open_jobs(conn: &mut AsyncPgConnection) -> String { let jobs = background_jobs::table .select((background_jobs::job_type, background_jobs::data)) .load::<(String, serde_json::Value)>(conn) + .await .unwrap(); jobs.into_iter()