From a954e625a4d49ee1938f024dfdda0c729048aaf2 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 11 Oct 2024 15:23:08 +0200 Subject: [PATCH] worker: Extract `enqueue_simple/deduplicated()` fns --- crates/crates_io_worker/src/background_job.rs | 97 +++++++++++-------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/crates/crates_io_worker/src/background_job.rs b/crates/crates_io_worker/src/background_job.rs index f121bc1fb58..c8b5214ad8c 100644 --- a/crates/crates_io_worker/src/background_job.rs +++ b/crates/crates_io_worker/src/background_job.rs @@ -7,6 +7,7 @@ use diesel::prelude::*; use diesel::sql_types::{Int2, Jsonb, Text}; use serde::de::DeserializeOwned; use serde::Serialize; +use serde_json::Value; use std::future::Future; use tracing::instrument; @@ -49,49 +50,67 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static { fn enqueue_with_priority( &self, conn: &mut impl LoadConnection, - job_priority: i16, + priority: i16, ) -> Result, EnqueueError> { - let job_data = serde_json::to_value(self)?; + let data = serde_json::to_value(self)?; if Self::DEDUPLICATED { - let similar_jobs = background_jobs::table - .select(background_jobs::id) - .filter(background_jobs::job_type.eq(Self::JOB_NAME)) - .filter(background_jobs::data.eq(&job_data)) - .filter(background_jobs::priority.eq(job_priority)) - .for_update() - .skip_locked(); - - let deduplicated_select = diesel::select(( - Self::JOB_NAME.into_sql::(), - (&job_data).into_sql::(), - job_priority.into_sql::(), - )) - .filter(not(exists(similar_jobs))); - - let id = diesel::insert_into(background_jobs::table) - .values(deduplicated_select) - .into_columns(( - background_jobs::job_type, - background_jobs::data, - background_jobs::priority, - )) - .returning(background_jobs::id) - .get_result::(conn) - .optional()?; - - Ok(id) + Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority)?) } else { - let id = diesel::insert_into(background_jobs::table) - .values(( - background_jobs::job_type.eq(Self::JOB_NAME), - background_jobs::data.eq(job_data), - background_jobs::priority.eq(job_priority), - )) - .returning(background_jobs::id) - .get_result(conn)?; - - Ok(Some(id)) + Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?)) } } } + +fn enqueue_deduplicated( + conn: &mut impl LoadConnection, + job_type: &str, + data: &Value, + priority: i16, +) -> Result, EnqueueError> { + let similar_jobs = background_jobs::table + .select(background_jobs::id) + .filter(background_jobs::job_type.eq(job_type)) + .filter(background_jobs::data.eq(data)) + .filter(background_jobs::priority.eq(priority)) + .for_update() + .skip_locked(); + + let deduplicated_select = diesel::select(( + job_type.into_sql::(), + data.into_sql::(), + priority.into_sql::(), + )) + .filter(not(exists(similar_jobs))); + + let id = diesel::insert_into(background_jobs::table) + .values(deduplicated_select) + .into_columns(( + background_jobs::job_type, + background_jobs::data, + background_jobs::priority, + )) + .returning(background_jobs::id) + .get_result::(conn) + .optional()?; + + Ok(id) +} + +fn enqueue_simple( + conn: &mut impl LoadConnection, + job_type: &str, + data: &Value, + priority: i16, +) -> Result { + let id = diesel::insert_into(background_jobs::table) + .values(( + background_jobs::job_type.eq(job_type), + background_jobs::data.eq(data), + background_jobs::priority.eq(priority), + )) + .returning(background_jobs::id) + .get_result(conn)?; + + Ok(id) +}