From 09191e7f0a0ddee48e772f8bf5a8877eda58d386 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 1 Nov 2024 23:23:41 +0100 Subject: [PATCH 1/6] worker/tests: Migrate to async database queries --- crates/crates_io_worker/tests/runner.rs | 90 +++++++++++++++---------- 1 file changed, 55 insertions(+), 35 deletions(-) diff --git a/crates/crates_io_worker/tests/runner.rs b/crates/crates_io_worker/tests/runner.rs index 89d4913cc4f..79631c3dba6 100644 --- a/crates/crates_io_worker/tests/runner.rs +++ b/crates/crates_io_worker/tests/runner.rs @@ -5,7 +5,7 @@ use crates_io_worker::{BackgroundJob, Runner}; use diesel::prelude::*; use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::pooled_connection::AsyncDieselConnectionManager; -use diesel_async::AsyncPgConnection; +use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use insta::assert_compact_json_snapshot; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -13,30 +13,33 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use tokio::sync::Barrier; -fn all_jobs(conn: &mut PgConnection) -> Vec<(String, Value)> { +async fn all_jobs(conn: &mut AsyncPgConnection) -> Vec<(String, Value)> { background_jobs::table .select((background_jobs::job_type, background_jobs::data)) .get_results(conn) + .await .unwrap() } -fn job_exists(id: i64, conn: &mut PgConnection) -> bool { +async fn job_exists(id: i64, conn: &mut AsyncPgConnection) -> bool { background_jobs::table .find(id) .select(background_jobs::id) .get_result::(conn) + .await .optional() .unwrap() .is_some() } -fn job_is_locked(id: i64, conn: &mut PgConnection) -> bool { +async fn job_is_locked(id: i64, conn: &mut AsyncPgConnection) -> bool { background_jobs::table .find(id) .select(background_jobs::id) .for_update() .skip_locked() .get_result::(conn) + .await .optional() .unwrap() .is_none() @@ -73,22 +76,25 @@ async fn jobs_are_locked_when_fetched() { let runner = runner(test_database.url(), test_context.clone()).register_job_type::(); - let mut conn = test_database.connect(); - let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap(); + let mut conn = AsyncPgConnection::establish(test_database.url()) + .await + .unwrap(); + + let job_id = TestJob.async_enqueue(&mut conn).await.unwrap().unwrap(); - assert!(job_exists(job_id, &mut conn)); - assert!(!job_is_locked(job_id, &mut conn)); + assert!(job_exists(job_id, &mut conn).await); + assert!(!job_is_locked(job_id, &mut conn).await); let runner = runner.start(); test_context.job_started_barrier.wait().await; - assert!(job_exists(job_id, &mut conn)); - assert!(job_is_locked(job_id, &mut conn)); + assert!(job_exists(job_id, &mut conn).await); + assert!(job_is_locked(job_id, &mut conn).await); test_context.assertions_finished_barrier.wait().await; runner.wait_for_shutdown().await; - assert!(!job_exists(job_id, &mut conn)); + assert!(!job_exists(job_id, &mut conn).await); } #[tokio::test(flavor = "multi_thread")] @@ -105,10 +111,11 @@ async fn jobs_are_deleted_when_successfully_run() { } } - fn remaining_jobs(conn: &mut PgConnection) -> i64 { + async fn remaining_jobs(conn: &mut AsyncPgConnection) -> i64 { background_jobs::table .count() .get_result(&mut *conn) + .await .unwrap() } @@ -116,15 +123,18 @@ async fn jobs_are_deleted_when_successfully_run() { let runner = runner(test_database.url(), ()).register_job_type::(); - let mut conn = test_database.connect(); - assert_eq!(remaining_jobs(&mut conn), 0); + let mut conn = AsyncPgConnection::establish(test_database.url()) + .await + .unwrap(); + + assert_eq!(remaining_jobs(&mut conn).await, 0); - TestJob.enqueue(&mut conn).unwrap(); - assert_eq!(remaining_jobs(&mut conn), 1); + TestJob.async_enqueue(&mut conn).await.unwrap(); + assert_eq!(remaining_jobs(&mut conn).await, 1); let runner = runner.start(); runner.wait_for_shutdown().await; - assert_eq!(remaining_jobs(&mut conn), 0); + assert_eq!(remaining_jobs(&mut conn).await, 0); } #[tokio::test(flavor = "multi_thread")] @@ -155,8 +165,11 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { let runner = runner(test_database.url(), test_context.clone()).register_job_type::(); - let mut conn = test_database.connect(); - TestJob.enqueue(&mut conn).unwrap(); + let mut conn = AsyncPgConnection::establish(test_database.url()) + .await + .unwrap(); + + TestJob.async_enqueue(&mut conn).await.unwrap(); let runner = runner.start(); test_context.job_started_barrier.wait().await; @@ -169,7 +182,8 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { .select(background_jobs::id) .filter(background_jobs::retries.eq(0)) .for_update() - .load::(&mut *conn) + .load::(&mut conn) + .await .unwrap(); assert_eq!(available_jobs.len(), 0); @@ -177,7 +191,8 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { let total_jobs_including_failed = background_jobs::table .select(background_jobs::id) .for_update() - .load::(&mut *conn) + .load::(&mut conn) + .await .unwrap(); assert_eq!(total_jobs_including_failed.len(), 1); @@ -202,9 +217,11 @@ async fn panicking_in_jobs_updates_retry_counter() { let runner = runner(test_database.url(), ()).register_job_type::(); - let mut conn = test_database.connect(); + let mut conn = AsyncPgConnection::establish(test_database.url()) + .await + .unwrap(); - let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap(); + let job_id = TestJob.async_enqueue(&mut conn).await.unwrap().unwrap(); let runner = runner.start(); runner.wait_for_shutdown().await; @@ -213,7 +230,8 @@ async fn panicking_in_jobs_updates_retry_counter() { .find(job_id) .select(background_jobs::retries) .for_update() - .first::(&mut *conn) + .first::(&mut conn) + .await .unwrap(); assert_eq!(tries, 1); } @@ -264,15 +282,17 @@ async fn jobs_can_be_deduplicated() { let runner = runner(test_database.url(), test_context.clone()).register_job_type::(); - let mut conn = test_database.connect(); + let mut conn = AsyncPgConnection::establish(test_database.url()) + .await + .unwrap(); // Enqueue first job - assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#); + assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); + assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}]]"#); // Try to enqueue the same job again, which should be deduplicated - assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#); + assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); + assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}]]"#); // Start processing the first job let runner = runner.start(); @@ -280,17 +300,17 @@ async fn jobs_can_be_deduplicated() { // Enqueue the same job again, which should NOT be deduplicated, // since the first job already still running - assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); + assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); + assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); // Try to enqueue the same job again, which should be deduplicated again - assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); + assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); + assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); // Enqueue the same job but with different data, which should // NOT be deduplicated - assert_some!(TestJob::new("bar").enqueue(&mut conn).unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#); + assert_some!(TestJob::new("bar").async_enqueue(&mut conn).await.unwrap()); + assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#); // Resolve the final barrier to finish the test test_context.assertions_finished_barrier.wait().await; From 7650fe37e7b770bbf2d78bf09ab39c60c382f0c5 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 1 Nov 2024 23:25:27 +0100 Subject: [PATCH 2/6] worker/tests: Extract `pool()` fn --- crates/crates_io_worker/tests/runner.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/crates_io_worker/tests/runner.rs b/crates/crates_io_worker/tests/runner.rs index 79631c3dba6..6b2442c6dfc 100644 --- a/crates/crates_io_worker/tests/runner.rs +++ b/crates/crates_io_worker/tests/runner.rs @@ -317,12 +317,16 @@ async fn jobs_can_be_deduplicated() { runner.wait_for_shutdown().await; } +fn pool(database_url: &str) -> Pool { + let manager = AsyncDieselConnectionManager::::new(database_url); + Pool::builder(manager).max_size(4).build().unwrap() +} + fn runner( database_url: &str, context: Context, ) -> Runner { - let manager = AsyncDieselConnectionManager::::new(database_url); - let deadpool = Pool::builder(manager).max_size(4).build().unwrap(); + let deadpool = pool(database_url); Runner::new(deadpool, context) .configure_default_queue(|queue| queue.num_workers(2)) From f04551d97b7702bd4dbe0dbb407dfbfb3e2e2016 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 1 Nov 2024 23:27:54 +0100 Subject: [PATCH 3/6] worker/tests: Move `pool()` call out of `runner()` fn --- crates/crates_io_worker/tests/runner.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/crates_io_worker/tests/runner.rs b/crates/crates_io_worker/tests/runner.rs index 6b2442c6dfc..f7e77fda5fd 100644 --- a/crates/crates_io_worker/tests/runner.rs +++ b/crates/crates_io_worker/tests/runner.rs @@ -74,7 +74,8 @@ async fn jobs_are_locked_when_fetched() { assertions_finished_barrier: Arc::new(Barrier::new(2)), }; - let runner = runner(test_database.url(), test_context.clone()).register_job_type::(); + let pool = pool(test_database.url()); + let runner = runner(pool, test_context.clone()).register_job_type::(); let mut conn = AsyncPgConnection::establish(test_database.url()) .await @@ -121,7 +122,8 @@ async fn jobs_are_deleted_when_successfully_run() { let test_database = TestDatabase::new(); - let runner = runner(test_database.url(), ()).register_job_type::(); + let pool = pool(test_database.url()); + let runner = runner(pool, ()).register_job_type::(); let mut conn = AsyncPgConnection::establish(test_database.url()) .await @@ -163,7 +165,8 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { job_started_barrier: Arc::new(Barrier::new(2)), }; - let runner = runner(test_database.url(), test_context.clone()).register_job_type::(); + let pool = pool(test_database.url()); + let runner = runner(pool, test_context.clone()).register_job_type::(); let mut conn = AsyncPgConnection::establish(test_database.url()) .await @@ -215,7 +218,8 @@ async fn panicking_in_jobs_updates_retry_counter() { let test_database = TestDatabase::new(); - let runner = runner(test_database.url(), ()).register_job_type::(); + let pool = pool(test_database.url()); + let runner = runner(pool, ()).register_job_type::(); let mut conn = AsyncPgConnection::establish(test_database.url()) .await @@ -280,7 +284,8 @@ async fn jobs_can_be_deduplicated() { assertions_finished_barrier: Arc::new(Barrier::new(2)), }; - let runner = runner(test_database.url(), test_context.clone()).register_job_type::(); + let pool = pool(test_database.url()); + let runner = runner(pool, test_context.clone()).register_job_type::(); let mut conn = AsyncPgConnection::establish(test_database.url()) .await @@ -323,11 +328,9 @@ fn pool(database_url: &str) -> Pool { } fn runner( - database_url: &str, + deadpool: Pool, context: Context, ) -> Runner { - let deadpool = pool(database_url); - Runner::new(deadpool, context) .configure_default_queue(|queue| queue.num_workers(2)) .shutdown_when_queue_empty() From a87037edf69e34d5429c3c64c85c80df1efa8345 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 1 Nov 2024 23:29:45 +0100 Subject: [PATCH 4/6] worker/tests: Use connection pool for test connection --- crates/crates_io_worker/tests/runner.rs | 32 +++++++++---------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/crates/crates_io_worker/tests/runner.rs b/crates/crates_io_worker/tests/runner.rs index f7e77fda5fd..1fab708e1e6 100644 --- a/crates/crates_io_worker/tests/runner.rs +++ b/crates/crates_io_worker/tests/runner.rs @@ -5,7 +5,7 @@ use crates_io_worker::{BackgroundJob, Runner}; use diesel::prelude::*; use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::pooled_connection::AsyncDieselConnectionManager; -use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use insta::assert_compact_json_snapshot; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -75,11 +75,9 @@ async fn jobs_are_locked_when_fetched() { }; let pool = pool(test_database.url()); - let runner = runner(pool, test_context.clone()).register_job_type::(); + let mut conn = pool.get().await.unwrap(); - let mut conn = AsyncPgConnection::establish(test_database.url()) - .await - .unwrap(); + let runner = runner(pool, test_context.clone()).register_job_type::(); let job_id = TestJob.async_enqueue(&mut conn).await.unwrap().unwrap(); @@ -123,11 +121,9 @@ async fn jobs_are_deleted_when_successfully_run() { let test_database = TestDatabase::new(); let pool = pool(test_database.url()); - let runner = runner(pool, ()).register_job_type::(); + let mut conn = pool.get().await.unwrap(); - let mut conn = AsyncPgConnection::establish(test_database.url()) - .await - .unwrap(); + let runner = runner(pool, ()).register_job_type::(); assert_eq!(remaining_jobs(&mut conn).await, 0); @@ -166,11 +162,9 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { }; let pool = pool(test_database.url()); - let runner = runner(pool, test_context.clone()).register_job_type::(); + let mut conn = pool.get().await.unwrap(); - let mut conn = AsyncPgConnection::establish(test_database.url()) - .await - .unwrap(); + let runner = runner(pool, test_context.clone()).register_job_type::(); TestJob.async_enqueue(&mut conn).await.unwrap(); @@ -219,11 +213,9 @@ async fn panicking_in_jobs_updates_retry_counter() { let test_database = TestDatabase::new(); let pool = pool(test_database.url()); - let runner = runner(pool, ()).register_job_type::(); + let mut conn = pool.get().await.unwrap(); - let mut conn = AsyncPgConnection::establish(test_database.url()) - .await - .unwrap(); + let runner = runner(pool, ()).register_job_type::(); let job_id = TestJob.async_enqueue(&mut conn).await.unwrap().unwrap(); @@ -285,11 +277,9 @@ async fn jobs_can_be_deduplicated() { }; let pool = pool(test_database.url()); - let runner = runner(pool, test_context.clone()).register_job_type::(); + let mut conn = pool.get().await.unwrap(); - let mut conn = AsyncPgConnection::establish(test_database.url()) - .await - .unwrap(); + let runner = runner(pool, test_context.clone()).register_job_type::(); // Enqueue first job assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); From ee0d25f94150436ec1d8feadf1a15a5caceab425 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 1 Nov 2024 23:31:18 +0100 Subject: [PATCH 5/6] worker/tests: Use regular `tokio::test` We've fixed these tests, so we don't need the `multi_thread` runtime flavor anymore. --- crates/crates_io_worker/Cargo.toml | 2 +- crates/crates_io_worker/tests/runner.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/crates_io_worker/Cargo.toml b/crates/crates_io_worker/Cargo.toml index 0620f0d7693..fb61851d1fc 100644 --- a/crates/crates_io_worker/Cargo.toml +++ b/crates/crates_io_worker/Cargo.toml @@ -24,4 +24,4 @@ tracing = "=0.1.40" claims = "=0.7.1" crates_io_test_db = { path = "../crates_io_test_db" } insta = { version = "=1.41.1", features = ["json"] } -tokio = { version = "=1.41.0", features = ["macros", "rt", "rt-multi-thread", "sync"]} +tokio = { version = "=1.41.0", features = ["macros", "sync"]} diff --git a/crates/crates_io_worker/tests/runner.rs b/crates/crates_io_worker/tests/runner.rs index 1fab708e1e6..70b3c115fc5 100644 --- a/crates/crates_io_worker/tests/runner.rs +++ b/crates/crates_io_worker/tests/runner.rs @@ -45,7 +45,7 @@ async fn job_is_locked(id: i64, conn: &mut AsyncPgConnection) -> bool { .is_none() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn jobs_are_locked_when_fetched() { #[derive(Clone)] struct TestContext { @@ -96,7 +96,7 @@ async fn jobs_are_locked_when_fetched() { assert!(!job_exists(job_id, &mut conn).await); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn jobs_are_deleted_when_successfully_run() { #[derive(Serialize, Deserialize)] struct TestJob; @@ -135,7 +135,7 @@ async fn jobs_are_deleted_when_successfully_run() { assert_eq!(remaining_jobs(&mut conn).await, 0); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { #[derive(Clone)] struct TestContext { @@ -196,7 +196,7 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { runner.wait_for_shutdown().await; } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn panicking_in_jobs_updates_retry_counter() { #[derive(Serialize, Deserialize)] struct TestJob; @@ -232,7 +232,7 @@ async fn panicking_in_jobs_updates_retry_counter() { assert_eq!(tries, 1); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn jobs_can_be_deduplicated() { #[derive(Clone)] struct TestContext { From e8c1b7904f35699755d21b20ae4681e761a35ce6 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 1 Nov 2024 23:44:57 +0100 Subject: [PATCH 6/6] worker/tests: Replace `.unwrap()` with `?` --- crates/crates_io_worker/tests/runner.rs | 126 ++++++++++++------------ 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/crates/crates_io_worker/tests/runner.rs b/crates/crates_io_worker/tests/runner.rs index 70b3c115fc5..97ae792dc5f 100644 --- a/crates/crates_io_worker/tests/runner.rs +++ b/crates/crates_io_worker/tests/runner.rs @@ -13,40 +13,37 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use tokio::sync::Barrier; -async fn all_jobs(conn: &mut AsyncPgConnection) -> Vec<(String, Value)> { +async fn all_jobs(conn: &mut AsyncPgConnection) -> QueryResult> { background_jobs::table .select((background_jobs::job_type, background_jobs::data)) .get_results(conn) .await - .unwrap() } -async fn job_exists(id: i64, conn: &mut AsyncPgConnection) -> bool { - background_jobs::table +async fn job_exists(id: i64, conn: &mut AsyncPgConnection) -> QueryResult { + Ok(background_jobs::table .find(id) .select(background_jobs::id) .get_result::(conn) .await - .optional() - .unwrap() - .is_some() + .optional()? + .is_some()) } -async fn job_is_locked(id: i64, conn: &mut AsyncPgConnection) -> bool { - background_jobs::table +async fn job_is_locked(id: i64, conn: &mut AsyncPgConnection) -> QueryResult { + Ok(background_jobs::table .find(id) .select(background_jobs::id) .for_update() .skip_locked() .get_result::(conn) .await - .optional() - .unwrap() - .is_none() + .optional()? + .is_none()) } #[tokio::test] -async fn jobs_are_locked_when_fetched() { +async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> { #[derive(Clone)] struct TestContext { job_started_barrier: Arc, @@ -74,30 +71,32 @@ async fn jobs_are_locked_when_fetched() { assertions_finished_barrier: Arc::new(Barrier::new(2)), }; - let pool = pool(test_database.url()); - let mut conn = pool.get().await.unwrap(); + let pool = pool(test_database.url())?; + let mut conn = pool.get().await?; let runner = runner(pool, test_context.clone()).register_job_type::(); - let job_id = TestJob.async_enqueue(&mut conn).await.unwrap().unwrap(); + let job_id = assert_some!(TestJob.async_enqueue(&mut conn).await?); - assert!(job_exists(job_id, &mut conn).await); - assert!(!job_is_locked(job_id, &mut conn).await); + assert!(job_exists(job_id, &mut conn).await?); + assert!(!job_is_locked(job_id, &mut conn).await?); let runner = runner.start(); test_context.job_started_barrier.wait().await; - assert!(job_exists(job_id, &mut conn).await); - assert!(job_is_locked(job_id, &mut conn).await); + assert!(job_exists(job_id, &mut conn).await?); + assert!(job_is_locked(job_id, &mut conn).await?); test_context.assertions_finished_barrier.wait().await; runner.wait_for_shutdown().await; - assert!(!job_exists(job_id, &mut conn).await); + assert!(!job_exists(job_id, &mut conn).await?); + + Ok(()) } #[tokio::test] -async fn jobs_are_deleted_when_successfully_run() { +async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> { #[derive(Serialize, Deserialize)] struct TestJob; @@ -110,33 +109,31 @@ async fn jobs_are_deleted_when_successfully_run() { } } - async fn remaining_jobs(conn: &mut AsyncPgConnection) -> i64 { - background_jobs::table - .count() - .get_result(&mut *conn) - .await - .unwrap() + async fn remaining_jobs(conn: &mut AsyncPgConnection) -> QueryResult { + background_jobs::table.count().get_result(conn).await } let test_database = TestDatabase::new(); - let pool = pool(test_database.url()); - let mut conn = pool.get().await.unwrap(); + let pool = pool(test_database.url())?; + let mut conn = pool.get().await?; let runner = runner(pool, ()).register_job_type::(); - assert_eq!(remaining_jobs(&mut conn).await, 0); + assert_eq!(remaining_jobs(&mut conn).await?, 0); - TestJob.async_enqueue(&mut conn).await.unwrap(); - assert_eq!(remaining_jobs(&mut conn).await, 1); + TestJob.async_enqueue(&mut conn).await?; + assert_eq!(remaining_jobs(&mut conn).await?, 1); let runner = runner.start(); runner.wait_for_shutdown().await; - assert_eq!(remaining_jobs(&mut conn).await, 0); + assert_eq!(remaining_jobs(&mut conn).await?, 0); + + Ok(()) } #[tokio::test] -async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { +async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow::Result<()> { #[derive(Clone)] struct TestContext { job_started_barrier: Arc, @@ -161,12 +158,12 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { job_started_barrier: Arc::new(Barrier::new(2)), }; - let pool = pool(test_database.url()); - let mut conn = pool.get().await.unwrap(); + let pool = pool(test_database.url())?; + let mut conn = pool.get().await?; let runner = runner(pool, test_context.clone()).register_job_type::(); - TestJob.async_enqueue(&mut conn).await.unwrap(); + TestJob.async_enqueue(&mut conn).await?; let runner = runner.start(); test_context.job_started_barrier.wait().await; @@ -180,8 +177,7 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { .filter(background_jobs::retries.eq(0)) .for_update() .load::(&mut conn) - .await - .unwrap(); + .await?; assert_eq!(available_jobs.len(), 0); // Sanity check to make sure the job actually is there @@ -189,15 +185,16 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() { .select(background_jobs::id) .for_update() .load::(&mut conn) - .await - .unwrap(); + .await?; assert_eq!(total_jobs_including_failed.len(), 1); runner.wait_for_shutdown().await; + + Ok(()) } #[tokio::test] -async fn panicking_in_jobs_updates_retry_counter() { +async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> { #[derive(Serialize, Deserialize)] struct TestJob; @@ -212,12 +209,12 @@ async fn panicking_in_jobs_updates_retry_counter() { let test_database = TestDatabase::new(); - let pool = pool(test_database.url()); - let mut conn = pool.get().await.unwrap(); + let pool = pool(test_database.url())?; + let mut conn = pool.get().await?; let runner = runner(pool, ()).register_job_type::(); - let job_id = TestJob.async_enqueue(&mut conn).await.unwrap().unwrap(); + let job_id = assert_some!(TestJob.async_enqueue(&mut conn).await?); let runner = runner.start(); runner.wait_for_shutdown().await; @@ -227,13 +224,14 @@ async fn panicking_in_jobs_updates_retry_counter() { .select(background_jobs::retries) .for_update() .first::(&mut conn) - .await - .unwrap(); + .await?; assert_eq!(tries, 1); + + Ok(()) } #[tokio::test] -async fn jobs_can_be_deduplicated() { +async fn jobs_can_be_deduplicated() -> anyhow::Result<()> { #[derive(Clone)] struct TestContext { runs: Arc, @@ -276,18 +274,18 @@ async fn jobs_can_be_deduplicated() { assertions_finished_barrier: Arc::new(Barrier::new(2)), }; - let pool = pool(test_database.url()); - let mut conn = pool.get().await.unwrap(); + let pool = pool(test_database.url())?; + let mut conn = pool.get().await?; let runner = runner(pool, test_context.clone()).register_job_type::(); // Enqueue first job - assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}]]"#); + assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await?); + assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}]]"#); // Try to enqueue the same job again, which should be deduplicated - assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}]]"#); + assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await?); + assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}]]"#); // Start processing the first job let runner = runner.start(); @@ -295,26 +293,28 @@ async fn jobs_can_be_deduplicated() { // Enqueue the same job again, which should NOT be deduplicated, // since the first job already still running - assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); + assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await?); + assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); // Try to enqueue the same job again, which should be deduplicated again - assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await.unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); + assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await?); + assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#); // Enqueue the same job but with different data, which should // NOT be deduplicated - assert_some!(TestJob::new("bar").async_enqueue(&mut conn).await.unwrap()); - assert_compact_json_snapshot!(all_jobs(&mut conn).await, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#); + assert_some!(TestJob::new("bar").async_enqueue(&mut conn).await?); + assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#); // Resolve the final barrier to finish the test test_context.assertions_finished_barrier.wait().await; runner.wait_for_shutdown().await; + + Ok(()) } -fn pool(database_url: &str) -> Pool { +fn pool(database_url: &str) -> anyhow::Result> { let manager = AsyncDieselConnectionManager::::new(database_url); - Pool::builder(manager).max_size(4).build().unwrap() + Ok(Pool::builder(manager).max_size(4).build()?) } fn runner(