Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions crates/crates_io_worker/src/background_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use diesel::sql_types::{Int2, Jsonb, Text};
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
Expand Down Expand Up @@ -38,32 +40,37 @@
/// Execute the task. This method should define its logic.
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;

#[allow(async_fn_in_trait)]
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
async fn enqueue(&self, conn: &mut AsyncPgConnection) -> Result<Option<i64>, EnqueueError> {
let data = serde_json::to_value(self)?;
fn enqueue(
&self,
conn: &mut AsyncPgConnection,
) -> BoxFuture<'_, Result<Option<i64>, EnqueueError>> {
let data = match serde_json::to_value(self) {
Ok(data) => data,
Err(err) => return async move { Err(EnqueueError::SerializationError(err)) }.boxed(),

Check warning on line 50 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L50

Added line #L50 was not covered by tests
};
let priority = Self::PRIORITY;

if Self::DEDUPLICATED {
Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
let future = enqueue_deduplicated(conn, Self::JOB_NAME, data, priority);
future.boxed()
} else {
Ok(Some(
enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
))
let future = enqueue_simple(conn, Self::JOB_NAME, data, priority);
async move { Ok(Some(future.await?)) }.boxed()
}
}
}

async fn enqueue_deduplicated(
fn enqueue_deduplicated(
conn: &mut AsyncPgConnection,
job_type: &str,
data: &Value,
job_type: &'static str,
data: Value,
priority: i16,
) -> Result<Option<i64>, EnqueueError> {
) -> impl Future<Output = Result<Option<i64>, EnqueueError>> {
let similar_jobs = background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(job_type))
.filter(background_jobs::data.eq(data))
.filter(background_jobs::data.eq(data.clone()))
.filter(background_jobs::priority.eq(priority))
.for_update()
.skip_locked();
Expand All @@ -75,36 +82,33 @@
))
.filter(not(exists(similar_jobs)));

let id = diesel::insert_into(background_jobs::table)
let future = diesel::insert_into(background_jobs::table)
.values(deduplicated_select)
.into_columns((
background_jobs::job_type,
background_jobs::data,
background_jobs::priority,
))
.returning(background_jobs::id)
.get_result::<i64>(conn)
.await
.optional()?;
.get_result::<i64>(conn);

Ok(id)
async move { Ok(future.await.optional()?) }
}

async fn enqueue_simple(
fn enqueue_simple(
conn: &mut AsyncPgConnection,
job_type: &str,
data: &Value,
job_type: &'static str,
data: Value,
priority: i16,
) -> Result<i64, EnqueueError> {
let id = diesel::insert_into(background_jobs::table)
) -> impl Future<Output = Result<i64, EnqueueError>> {
let future = diesel::insert_into(background_jobs::table)
.values((
background_jobs::job_type.eq(job_type),
background_jobs::data.eq(data),
background_jobs::priority.eq(priority),
))
.returning(background_jobs::id)
.get_result(conn)
.await?;
.get_result(conn);

Ok(id)
async move { Ok(future.await?) }
}
26 changes: 11 additions & 15 deletions src/bin/crates-admin/delete_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,17 @@
info!("{name}: Skipped missing crate");
};

info!("{name}: Enqueuing index sync jobs…");
let job = jobs::SyncToGitIndex::new(name);
if let Err(error) = job.enqueue(&mut conn).await {
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
}

let job = jobs::SyncToSparseIndex::new(name);
if let Err(error) = job.enqueue(&mut conn).await {
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
}

info!("{name}: Enqueuing DeleteCrateFromStorage job…");
let job = jobs::DeleteCrateFromStorage::new(name.into());
if let Err(error) = job.enqueue(&mut conn).await {
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
info!("{name}: Enqueuing background jobs…");
let git_index_job = jobs::SyncToGitIndex::new(name);
let sparse_index_job = jobs::SyncToSparseIndex::new(name);
let delete_from_storage_job = jobs::DeleteCrateFromStorage::new(name.into());

Check warning on line 113 in src/bin/crates-admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/delete_crate.rs#L110-L113

Added lines #L110 - L113 were not covered by tests

if let Err(error) = tokio::try_join!(
git_index_job.enqueue(&mut conn),
sparse_index_job.enqueue(&mut conn),
delete_from_storage_job.enqueue(&mut conn),
) {
warn!("{name}: Failed to enqueue background job: {error}");

Check warning on line 120 in src/bin/crates-admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/delete_crate.rs#L115-L120

Added lines #L115 - L120 were not covered by tests
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/bin/crates-admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@
let crate_name = &opts.crate_name;

info!(%crate_name, "Enqueuing index sync jobs");
let job = jobs::SyncToGitIndex::new(crate_name);
if let Err(error) = job.enqueue(&mut conn).await {
warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job");
}
let job = jobs::SyncToSparseIndex::new(crate_name);
if let Err(error) = job.enqueue(&mut conn).await {
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");
let git_index_job = jobs::SyncToGitIndex::new(crate_name);
let sparse_index_job = jobs::SyncToSparseIndex::new(crate_name);

Check warning on line 99 in src/bin/crates-admin/delete_version.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/delete_version.rs#L98-L99

Added lines #L98 - L99 were not covered by tests

if let Err(error) = tokio::try_join!(
git_index_job.enqueue(&mut conn),
sparse_index_job.enqueue(&mut conn),
) {
warn!(%crate_name, "Failed to enqueue background job: {error}");

Check warning on line 105 in src/bin/crates-admin/delete_version.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/delete_version.rs#L101-L105

Added lines #L101 - L105 were not covered by tests
}

for version in &opts.versions {
Expand Down
12 changes: 8 additions & 4 deletions src/bin/crates-admin/yank_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@
.execute(conn)
.await?;

SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
let git_index_job = SyncToGitIndex::new(&krate.name);
let sparse_index_job = SyncToSparseIndex::new(&krate.name);
let update_default_version_job = UpdateDefaultVersion::new(krate.id);

Check warning on line 72 in src/bin/crates-admin/yank_version.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/yank_version.rs#L70-L72

Added lines #L70 - L72 were not covered by tests

SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;

UpdateDefaultVersion::new(krate.id).enqueue(conn).await?;
tokio::try_join!(
git_index_job.enqueue(conn),
sparse_index_job.enqueue(conn),
update_default_version_job.enqueue(conn),
)?;

Check warning on line 78 in src/bin/crates-admin/yank_version.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/yank_version.rs#L74-L78

Added lines #L74 - L78 were not covered by tests

Ok(())
}
55 changes: 34 additions & 21 deletions src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
use cargo_manifest::{Dependency, DepsSet, TargetDepsSet};
use chrono::{DateTime, SecondsFormat, Utc};
use crates_io_tarball::{process_tarball, TarballError};
use crates_io_worker::BackgroundJob;
use crates_io_worker::{BackgroundJob, EnqueueError};
use diesel::dsl::{exists, select};
use diesel::prelude::*;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use futures_util::TryFutureExt;
use futures_util::TryStreamExt;
use hex::ToHex;
use http::StatusCode;
Expand Down Expand Up @@ -503,29 +504,41 @@
.await
.map_err(|e| internal(format!("failed to upload crate: {e}")))?;

jobs::SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
jobs::SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;

SendPublishNotificationsJob::new(version.id).enqueue(conn).await?;
let git_index_job = jobs::SyncToGitIndex::new(&krate.name);
let sparse_index_job = jobs::SyncToSparseIndex::new(&krate.name);
let publish_notifications_job = SendPublishNotificationsJob::new(version.id);
let crate_feed_job = jobs::rss::SyncCrateFeed::new(krate.name.clone());
let updates_feed_job = jobs::rss::SyncUpdatesFeed;

tokio::try_join!(
git_index_job.enqueue(conn),
sparse_index_job.enqueue(conn),
publish_notifications_job.enqueue(conn),
crate_feed_job.enqueue(conn).or_else(|error| async move {
error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}");
Ok::<_, EnqueueError>(None)

Check warning on line 519 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L518-L519

Added lines #L518 - L519 were not covered by tests
}),
updates_feed_job.enqueue(conn).or_else(|error| async move {
error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}");
Ok::<_, EnqueueError>(None)

Check warning on line 523 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L522-L523

Added lines #L522 - L523 were not covered by tests
}),
)?;

// Experiment: check new crates for potential typosquatting.
if existing_crate.is_none() {
CheckTyposquat::new(&krate.name).enqueue(conn).await?;
}

let job = jobs::rss::SyncCrateFeed::new(krate.name.clone());
if let Err(error) = job.enqueue(conn).await {
error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}");
}

if let Err(error) = jobs::rss::SyncUpdatesFeed.enqueue(conn).await {
error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}");
}

if existing_crate.is_none() {
if let Err(error) = jobs::rss::SyncCratesFeed.enqueue(conn).await {
error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}");
}
let crates_feed_job = jobs::rss::SyncCratesFeed;
let typosquat_job = CheckTyposquat::new(&krate.name);

tokio::try_join!(
crates_feed_job.enqueue(conn).or_else(|error| async move {
error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}");
Ok::<_, EnqueueError>(None)

Check warning on line 535 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L534-L535

Added lines #L534 - L535 were not covered by tests
}),
typosquat_job.enqueue(conn).or_else(|error| async move {
error!("Failed to enqueue `CheckTyposquat` job: {error}");
Ok::<_, EnqueueError>(None)

Check warning on line 539 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L538-L539

Added lines #L538 - L539 were not covered by tests
}),
)?;
}

// The `other` field on `PublishWarnings` was introduced to handle a temporary warning
Expand Down
12 changes: 9 additions & 3 deletions src/controllers/version/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,15 @@ pub async fn perform_version_yank_update(
.insert(conn)
.await?;

SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;
UpdateDefaultVersion::new(krate.id).enqueue(conn).await?;
let git_index_job = SyncToGitIndex::new(&krate.name);
let sparse_index_job = SyncToSparseIndex::new(&krate.name);
let update_default_version_job = UpdateDefaultVersion::new(krate.id);

tokio::try_join!(
git_index_job.enqueue(conn),
sparse_index_job.enqueue(conn),
update_default_version_job.enqueue(conn),
)?;

Ok(())
}