diff --git a/src/controllers/krate/versions.rs b/src/controllers/krate/versions.rs index ed48f00e530..5694dd9c1a2 100644 --- a/src/controllers/krate/versions.rs +++ b/src/controllers/krate/versions.rs @@ -4,9 +4,9 @@ use crate::util::diesel::prelude::*; use axum::extract::Path; use axum_extra::json; use axum_extra::response::ErasedJson; -use diesel::connection::DefaultLoadingMode; use diesel::dsl::not; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; +use futures_util::{future, TryStreamExt}; use http::request::Parts; use indexmap::{IndexMap, IndexSet}; use std::cmp::Reverse; @@ -16,8 +16,6 @@ use crate::app::AppState; use crate::controllers::helpers::pagination::{encode_seek, Page, PaginationOptions}; use crate::models::{Crate, User, Version, VersionOwnerAction}; use crate::schema::{crates, users, versions}; -use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::util::errors::{bad_request, crate_not_found, AppResult, BoxedAppError}; use crate::util::RequestUtils; use crate::views::EncodableVersion; @@ -28,63 +26,59 @@ pub async fn versions( Path(crate_name): Path, req: Parts, ) -> AppResult { - let conn = state.db_read().await?; - spawn_blocking(move || { - use diesel::RunQueryDsl; - - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - - let crate_id: i32 = Crate::by_name(&crate_name) - .select(crates::id) - .first(conn) - .optional()? - .ok_or_else(|| crate_not_found(&crate_name))?; - - let mut pagination = None; - let params = req.query(); - // To keep backward compatibility, we paginate only if per_page is provided - if params.get("per_page").is_some() { - pagination = Some( - PaginationOptions::builder() - .enable_seek(true) - .enable_pages(false) - .gather(&req)?, - ); - } + let mut conn = state.db_read().await?; + + let crate_id: i32 = Crate::by_name(&crate_name) + .select(crates::id) + .first(&mut conn) + .await + .optional()? + .ok_or_else(|| crate_not_found(&crate_name))?; + + let mut pagination = None; + let params = req.query(); + // To keep backward compatibility, we paginate only if per_page is provided + if params.get("per_page").is_some() { + pagination = Some( + PaginationOptions::builder() + .enable_seek(true) + .enable_pages(false) + .gather(&req)?, + ); + } - let include = req - .query() - .get("include") - .map(|mode| ShowIncludeMode::from_str(mode)) - .transpose()? - .unwrap_or_default(); - - // Sort by semver by default - let versions_and_publishers = match params.get("sort").map(|s| s.to_lowercase()).as_deref() - { - Some("date") => list_by_date(crate_id, pagination.as_ref(), include, &req, conn)?, - _ => list_by_semver(crate_id, pagination.as_ref(), include, &req, conn)?, - }; + let include = req + .query() + .get("include") + .map(|mode| ShowIncludeMode::from_str(mode)) + .transpose()? + .unwrap_or_default(); + + // Sort by semver by default + let versions_and_publishers = match params.get("sort").map(|s| s.to_lowercase()).as_deref() { + Some("date") => { + list_by_date(crate_id, pagination.as_ref(), include, &req, &mut conn).await? + } + _ => list_by_semver(crate_id, pagination.as_ref(), include, &req, &mut conn).await?, + }; - let versions = versions_and_publishers - .data - .iter() - .map(|(v, _)| v) - .collect::>(); - let actions = VersionOwnerAction::for_versions(conn, &versions)?; - let versions = versions_and_publishers - .data - .into_iter() - .zip(actions) - .map(|((v, pb), aas)| EncodableVersion::from(v, &crate_name, pb, aas)) - .collect::>(); - - Ok(match pagination { - Some(_) => json!({ "versions": versions, "meta": versions_and_publishers.meta }), - None => json!({ "versions": versions }), - }) + let versions = versions_and_publishers + .data + .iter() + .map(|(v, _)| v) + .collect::>(); + let actions = VersionOwnerAction::async_for_versions(&mut conn, &versions).await?; + let versions = versions_and_publishers + .data + .into_iter() + .zip(actions) + .map(|((v, pb), aas)| EncodableVersion::from(v, &crate_name, pb, aas)) + .collect::>(); + + Ok(match pagination { + Some(_) => json!({ "versions": versions, "meta": versions_and_publishers.meta }), + None => json!({ "versions": versions }), }) - .await? } /// Seek-based pagination of versions by date @@ -92,14 +86,14 @@ pub async fn versions( /// # Panics /// /// This function will panic if `option` is built with `enable_pages` set to true. -fn list_by_date( +async fn list_by_date( crate_id: i32, options: Option<&PaginationOptions>, include: ShowIncludeMode, req: &Parts, - conn: &mut impl Conn, + conn: &mut AsyncPgConnection, ) -> AppResult { - use diesel::RunQueryDsl; + use diesel_async::RunQueryDsl; use seek::*; let mut query = versions::table @@ -126,17 +120,20 @@ fn list_by_date( if include.release_tracks { let mut sorted_versions = IndexSet::new(); - for result in versions::table + versions::table .filter(versions::crate_id.eq(crate_id)) .filter(not(versions::yanked)) .select(versions::num) - .load_iter::(conn)? - { - let Ok(semver) = semver::Version::parse(&result?) else { - continue; - }; - sorted_versions.insert(semver); - } + .load_stream::(conn) + .await? + .try_for_each(|num| { + if let Ok(semver) = semver::Version::parse(&num) { + sorted_versions.insert(semver); + }; + future::ready(Ok(())) + }) + .await?; + sorted_versions.sort_unstable_by(|a, b| b.cmp(a)); release_tracks = Some(ReleaseTracks::from_sorted_semver_iter( sorted_versions.iter(), @@ -146,7 +143,7 @@ fn list_by_date( query = query.order((versions::created_at.desc(), versions::id.desc())); - let data: Vec<(Version, Option)> = query.load(conn)?; + let data: Vec<(Version, Option)> = query.load(conn).await?; let mut next_page = None; if let Some(options) = options { next_page = next_seek_params(&data, options, |last| Seek::Date.to_payload(last))? @@ -159,7 +156,8 @@ fn list_by_date( versions::table .filter(versions::crate_id.eq(crate_id)) .count() - .get_result(conn)? + .get_result(conn) + .await? } else { 0 }; @@ -182,14 +180,13 @@ fn list_by_date( // Unfortunately, Heroku Postgres has no support for the semver PG extension. // Therefore, we need to perform both sorting and pagination manually on the server. -fn list_by_semver( +async fn list_by_semver( crate_id: i32, options: Option<&PaginationOptions>, include: ShowIncludeMode, req: &Parts, - conn: &mut impl Conn, + conn: &mut AsyncPgConnection, ) -> AppResult { - use diesel::RunQueryDsl; use seek::*; let (data, total, release_tracks) = if let Some(options) = options { @@ -200,16 +197,20 @@ fn list_by_semver( // without sorting twice. // Sorting by semver but opted for id as the seek key because num can be quite lengthy, // while id values are significantly smaller. + let mut sorted_versions = IndexMap::new(); - for result in versions::table + versions::table .filter(versions::crate_id.eq(crate_id)) .select((versions::id, versions::num, versions::yanked)) - .load_iter::<(i32, String, bool), DefaultLoadingMode>(conn)? - { - let (id, num, yanked) = result?; - let semver = semver::Version::parse(&num).ok(); - sorted_versions.insert(id, (semver, yanked, None)); - } + .load_stream::<(i32, String, bool)>(conn) + .await? + .try_for_each(|(id, num, yanked)| { + let semver = semver::Version::parse(&num).ok(); + sorted_versions.insert(id, (semver, yanked, None)); + future::ready(Ok(())) + }) + .await?; + sorted_versions .sort_unstable_by(|_, (semver_a, _, _), _, (semver_b, _, _)| semver_b.cmp(semver_a)); @@ -240,20 +241,23 @@ fn list_by_semver( .keys() .cloned() .collect::>(); - for result in versions::table + versions::table .filter(versions::crate_id.eq(crate_id)) .left_outer_join(users::table) .select(<(Version, Option)>::as_select()) .filter(versions::id.eq_any(ids)) - .load_iter::<(Version, Option), DefaultLoadingMode>(conn)? - { - let row = result?; - // The versions are already sorted, and we only need to enrich the fetched rows into them. - // Therefore, other values can now be safely ignored. - sorted_versions - .entry(row.0.id) - .and_modify(|entry| *entry = (None, false, Some(row))); - } + .load_stream::<(Version, Option)>(conn) + .await? + .try_for_each(|row| { + // The versions are already sorted, and we only need to enrich the fetched rows into them. + // Therefore, other values can now be safely ignored. + sorted_versions + .entry(row.0.id) + .and_modify(|entry| *entry = (None, false, Some(row))); + + future::ready(Ok(())) + }) + .await?; let len = sorted_versions.len(); ( @@ -272,7 +276,8 @@ fn list_by_semver( .filter(versions::crate_id.eq(crate_id)) .left_outer_join(users::table) .select(<(Version, Option)>::as_select()) - .load(conn)?; + .load(conn) + .await?; data.sort_by_cached_key(|(version, _)| Reverse(semver::Version::parse(&version.num).ok())); let total = data.len(); (data, total, None)