Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 10 additions & 24 deletions src/worker/jobs/archive_version_downloads.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::IndexVersionDownloadsArchive;
use crate::schema::version_downloads;
use crate::tasks::spawn_blocking;
use crate::worker::Environment;
use anyhow::{anyhow, Context};
use chrono::{NaiveDate, Utc};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures_util::StreamExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -63,10 +63,15 @@
export(&env.config.db.primary.url, &csv_path, &self.before).await?;
let dates = spawn_blocking(move || split(csv_path)).await?;
let uploaded_dates = upload(downloads_archive_store, tempdir.path(), dates).await?;
delete(&env.deadpool, uploaded_dates).await?;

let mut conn = env.deadpool.get().await?;
delete(&mut conn, uploaded_dates).await?;

Check warning on line 68 in src/worker/jobs/archive_version_downloads.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/jobs/archive_version_downloads.rs#L67-L68

Added lines #L67 - L68 were not covered by tests

// Queue up the job to regenerate the archive index.
enqueue_index_job(&env.deadpool).await?;
IndexVersionDownloadsArchive
.async_enqueue(&mut conn)
.await
.context("Failed to enqueue IndexVersionDownloadsArchive job")?;

Check warning on line 74 in src/worker/jobs/archive_version_downloads.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/jobs/archive_version_downloads.rs#L71-L74

Added lines #L71 - L74 were not covered by tests

info!("Finished archiving old version downloads");
Ok(())
Expand Down Expand Up @@ -218,12 +223,7 @@
}

/// Delete version downloads for the given dates from the database.
async fn delete(db_pool: &Pool<AsyncPgConnection>, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
let mut conn = db_pool.get().await?;
delete_inner(&mut conn, dates).await
}

async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
async fn delete(conn: &mut AsyncPgConnection, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
// Delete version downloads for the given dates in chunks to avoid running
// into the maximum query parameter limit.
const CHUNK_SIZE: usize = 5000;
Expand All @@ -244,23 +244,11 @@
Ok(())
}

async fn enqueue_index_job(db_pool: &Pool<AsyncPgConnection>) -> anyhow::Result<()> {
let mut conn = db_pool.get().await?;

super::IndexVersionDownloadsArchive
.async_enqueue(&mut conn)
.await
.context("Failed to enqueue IndexVersionDownloadsArchive job")?;

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{crates, version_downloads, versions};
use crates_io_test_db::TestDatabase;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncConnection;
use insta::assert_snapshot;

Expand Down Expand Up @@ -372,10 +360,8 @@
let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();
prepare_database(&mut conn).await;

let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(test_db.url());
let db_pool = Pool::builder(manager).build().unwrap();
let dates = vec![NaiveDate::from_ymd_opt(2021, 1, 1).unwrap()];
delete(&db_pool, dates).await.unwrap();
delete(&mut conn, dates).await.unwrap();

let row_count: i64 = version_downloads::table
.count()
Expand Down