Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,33 @@

use crate::models::{Crate, CrateVersions, Dependency, Version};
use crate::schema::{crates, dependencies};
use crate::util::diesel::Conn;
use anyhow::Context;
use crates_io_index::features::split_features;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use sentry::Level;

#[instrument(skip_all, fields(krate.name = ?name))]
pub fn get_index_data(name: &str, conn: &mut impl Conn) -> anyhow::Result<Option<String>> {
pub async fn get_index_data(
name: &str,
conn: &mut AsyncPgConnection,
) -> anyhow::Result<Option<String>> {
debug!("Looking up crate by name");
let Some(krate): Option<Crate> = Crate::by_exact_name(name).first(conn).optional()? else {
let krate = crates::table
.select(Crate::as_select())
.filter(crates::name.eq(name))
.first::<Crate>(conn)
.await
.optional();

let Some(krate) = krate? else {
return Ok(None);
};

debug!("Gathering remaining index data");
let crates = index_metadata(&krate, conn).context("Failed to gather index metadata")?;
let crates = index_metadata(&krate, conn)
.await
.context("Failed to gather index metadata")?;

// This can sometimes happen when we delete versions upon owner request
// but don't realize that the crate is now left with no versions at all.
Expand All @@ -43,11 +55,11 @@ pub fn get_index_data(name: &str, conn: &mut impl Conn) -> anyhow::Result<Option
}

/// Gather all the necessary data to write an index metadata file
pub fn index_metadata(
pub async fn index_metadata(
krate: &Crate,
conn: &mut impl Conn,
conn: &mut AsyncPgConnection,
) -> QueryResult<Vec<crates_io_index::Crate>> {
let mut versions: Vec<Version> = krate.all_versions().load(conn)?;
let mut versions: Vec<Version> = krate.all_versions().load(conn).await?;

// We sort by `created_at` by default, but since tests run within a
// single database transaction the versions will all have the same
Expand All @@ -57,7 +69,8 @@ pub fn index_metadata(
let deps: Vec<(Dependency, String)> = Dependency::belonging_to(&versions)
.inner_join(crates::table)
.select((dependencies::all_columns, crates::name))
.load(conn)?;
.load(conn)
.await?;

let deps = deps.grouped_by(&versions);

Expand Down Expand Up @@ -127,12 +140,14 @@ mod tests {
use crate::tests::builders::{CrateBuilder, VersionBuilder};
use chrono::{Days, Utc};
use crates_io_test_db::TestDatabase;
use diesel_async::AsyncConnection;
use insta::assert_json_snapshot;

#[test]
fn test_index_metadata() {
#[tokio::test]
async fn test_index_metadata() {
let test_db = TestDatabase::new();
let mut conn = test_db.connect();
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();

let user_id = diesel::insert_into(users::table)
.values((
Expand All @@ -142,7 +157,8 @@ mod tests {
users::gh_access_token.eq("some random token"),
))
.returning(users::id)
.get_result::<i32>(&mut conn)
.get_result::<i32>(&mut async_conn)
.await
.unwrap();

let created_at_1 = Utc::now()
Expand All @@ -159,7 +175,7 @@ mod tests {
.version(VersionBuilder::new("0.1.0"))
.expect_build(&mut conn);

let metadata = index_metadata(&fooo, &mut conn).unwrap();
let metadata = index_metadata(&fooo, &mut async_conn).await.unwrap();
assert_json_snapshot!(metadata);

let bar = CrateBuilder::new("bar", user_id)
Expand All @@ -177,7 +193,7 @@ mod tests {
.version(VersionBuilder::new("1.0.1").checksum("0123456789abcdef"))
.expect_build(&mut conn);

let metadata = index_metadata(&bar, &mut conn).unwrap();
let metadata = index_metadata(&bar, &mut async_conn).await.unwrap();
assert_json_snapshot!(metadata);
}
}
22 changes: 10 additions & 12 deletions src/worker/jobs/index/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::worker::Environment;
use anyhow::Context;
use crates_io_index::Repository;
use crates_io_worker::BackgroundJob;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use std::fs;
use std::fs::File;
use std::io::{ErrorKind, Write};
Expand Down Expand Up @@ -36,12 +35,13 @@ impl BackgroundJob for SyncToGitIndex {
info!("Syncing to git index");

let crate_name = self.krate.clone();
let conn = env.deadpool.get().await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
let mut conn = env.deadpool.get().await?;

let new = get_index_data(&crate_name, conn).context("Failed to get index data")?;
let new = get_index_data(&crate_name, &mut conn)
.await
.context("Failed to get index data")?;

spawn_blocking(move || {
let repo = env.lock_index()?;
let dst = repo.index_file(&crate_name);

Expand Down Expand Up @@ -102,13 +102,11 @@ impl BackgroundJob for SyncToSparseIndex {
info!("Syncing to sparse index");

let crate_name = self.krate.clone();
let conn = env.deadpool.get().await?;
let content = spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
get_index_data(&crate_name, conn)
})
.await
.context("Failed to get index data")?;
let mut conn = env.deadpool.get().await?;

let content = get_index_data(&crate_name, &mut conn)
.await
.context("Failed to get index data")?;

let future = env.storage.sync_index(&self.krate, content);
future.await.context("Failed to sync index data")?;
Expand Down