diff --git a/src/auth.rs b/src/auth.rs index 089d74d18d0..755f7110eac 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -4,12 +4,12 @@ use crate::middleware::log_request::RequestLogExt; use crate::middleware::session::RequestSession; use crate::models::token::{CrateScope, EndpointScope}; use crate::models::{ApiToken, User}; -use crate::util::diesel::Conn; use crate::util::errors::{ account_locked, forbidden, internal, AppResult, InsecurelyGeneratedTokenRevoked, }; use crate::util::token::HashedToken; use chrono::Utc; +use diesel_async::AsyncPgConnection; use http::header; use http::request::Parts; @@ -58,8 +58,12 @@ impl AuthCheck { } #[instrument(name = "auth.check", skip_all)] - pub fn check(&self, parts: &Parts, conn: &mut impl Conn) -> AppResult { - let auth = authenticate(parts, conn)?; + pub async fn check( + &self, + parts: &Parts, + conn: &mut AsyncPgConnection, + ) -> AppResult { + let auth = authenticate(parts, conn).await?; if let Some(token) = auth.api_token() { if !self.allow_token { @@ -168,9 +172,9 @@ impl Authentication { } #[instrument(skip_all)] -fn authenticate_via_cookie( +async fn authenticate_via_cookie( parts: &Parts, - conn: &mut impl Conn, + conn: &mut AsyncPgConnection, ) -> AppResult> { let user_id_from_session = parts .session() @@ -181,7 +185,7 @@ fn authenticate_via_cookie( return Ok(None); }; - let user = User::find(conn, id).map_err(|err| { + let user = User::async_find(conn, id).await.map_err(|err| { parts.request_log().add("cause", err); internal("user_id from cookie not found in database") })?; @@ -194,9 +198,9 @@ fn authenticate_via_cookie( } #[instrument(skip_all)] -fn authenticate_via_token( +async fn authenticate_via_token( parts: &Parts, - conn: &mut impl Conn, + conn: &mut AsyncPgConnection, ) -> AppResult> { let maybe_authorization = parts .headers() @@ -210,14 +214,16 @@ fn authenticate_via_token( let token = HashedToken::parse(header_value).map_err(|_| InsecurelyGeneratedTokenRevoked::boxed())?; - let token = ApiToken::find_by_api_token(conn, &token).map_err(|e| { - let cause = format!("invalid token caused by {e}"); - parts.request_log().add("cause", cause); + let token = ApiToken::async_find_by_api_token(conn, &token) + .await + .map_err(|e| { + let cause = format!("invalid token caused by {e}"); + parts.request_log().add("cause", cause); - forbidden("authentication failed") - })?; + forbidden("authentication failed") + })?; - let user = User::find(conn, token.user_id).map_err(|err| { + let user = User::async_find(conn, token.user_id).await.map_err(|err| { parts.request_log().add("cause", err); internal("user_id from token not found in database") })?; @@ -231,16 +237,16 @@ fn authenticate_via_token( } #[instrument(skip_all)] -fn authenticate(parts: &Parts, conn: &mut impl Conn) -> AppResult { +async fn authenticate(parts: &Parts, conn: &mut AsyncPgConnection) -> AppResult { controllers::util::verify_origin(parts)?; - match authenticate_via_cookie(parts, conn) { + match authenticate_via_cookie(parts, conn).await { Ok(None) => {} Ok(Some(auth)) => return Ok(Authentication::Cookie(auth)), Err(err) => return Err(err), } - match authenticate_via_token(parts, conn) { + match authenticate_via_token(parts, conn).await { Ok(None) => {} Ok(Some(auth)) => return Ok(Authentication::Token(auth)), Err(err) => return Err(err), diff --git a/src/controllers/crate_owner_invitation.rs b/src/controllers/crate_owner_invitation.rs index 09cc57cfddb..cffb8d7ad15 100644 --- a/src/controllers/crate_owner_invitation.rs +++ b/src/controllers/crate_owner_invitation.rs @@ -27,11 +27,11 @@ use tokio::runtime::Handle; /// Handles the `GET /api/v1/me/crate_owner_invitations` route. pub async fn list(app: AppState, req: Parts) -> AppResult> { - let conn = app.db_read().await?; + let mut conn = app.db_read().await?; + let auth = AuthCheck::only_cookie().check(&req, &mut conn).await?; spawn_blocking(move || { let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::only_cookie().check(&req, conn)?; let user_id = auth.user_id(); let PrivateListResponse { @@ -69,12 +69,11 @@ pub async fn list(app: AppState, req: Parts) -> AppResult> { /// Handles the `GET /api/private/crate_owner_invitations` route. pub async fn private_list(app: AppState, req: Parts) -> AppResult> { - let conn = app.db_read().await?; + let mut conn = app.db_read().await?; + let auth = AuthCheck::only_cookie().check(&req, &mut conn).await?; spawn_blocking(move || { let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::only_cookie().check(&req, conn)?; - let filter = if let Some(crate_name) = req.query().get("crate_name") { ListFilter::CrateName(crate_name.clone()) } else if let Some(id) = req.query().get("invitee_id").and_then(|i| i.parse().ok()) { @@ -284,11 +283,11 @@ pub async fn handle_invite(state: AppState, req: BytesRequest) -> AppResult = &mut conn.into(); - let auth = AuthCheck::default().check(&parts, conn)?; let user_id = auth.user_id(); let config = &state.config; diff --git a/src/controllers/krate/follow.rs b/src/controllers/krate/follow.rs index d13d5614859..9d1c6447cd7 100644 --- a/src/controllers/krate/follow.rs +++ b/src/controllers/krate/follow.rs @@ -34,13 +34,13 @@ pub async fn follow( Path(crate_name): Path, req: Parts, ) -> AppResult { - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let user_id = AuthCheck::default().check(&req, &mut conn).await?.user_id(); spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let user_id = AuthCheck::default().check(&req, conn)?.user_id(); let follow = follow_target(&crate_name, conn, user_id)?; diesel::insert_into(follows::table) .values(&follow) @@ -58,13 +58,13 @@ pub async fn unfollow( Path(crate_name): Path, req: Parts, ) -> AppResult { - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let user_id = AuthCheck::default().check(&req, &mut conn).await?.user_id(); spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let user_id = AuthCheck::default().check(&req, conn)?.user_id(); let follow = follow_target(&crate_name, conn, user_id)?; diesel::delete(&follow).execute(conn)?; @@ -79,7 +79,11 @@ pub async fn following( Path(crate_name): Path, req: Parts, ) -> AppResult> { - let conn = app.db_read_prefer_primary().await?; + let mut conn = app.db_read_prefer_primary().await?; + let user_id = AuthCheck::only_cookie() + .check(&req, &mut conn) + .await? + .user_id(); spawn_blocking(move || { use diesel::RunQueryDsl; @@ -87,7 +91,6 @@ pub async fn following( use diesel::dsl::exists; - let user_id = AuthCheck::only_cookie().check(&req, conn)?.user_id(); let follow = follow_target(&crate_name, conn, user_id)?; let following = diesel::select(exists(follows::table.find(follow.id()))).get_result::(conn)?; diff --git a/src/controllers/krate/owners.rs b/src/controllers/krate/owners.rs index 5abc2a86ed1..179643a81e3 100644 --- a/src/controllers/krate/owners.rs +++ b/src/controllers/krate/owners.rs @@ -130,17 +130,17 @@ async fn modify_owners( )); } - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let auth = AuthCheck::default() + .with_endpoint_scope(EndpointScope::ChangeOwners) + .for_crate(&crate_name) + .check(&parts, &mut conn) + .await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default() - .with_endpoint_scope(EndpointScope::ChangeOwners) - .for_crate(&crate_name) - .check(&parts, conn)?; - let user = auth.user(); // The set of emails to send out after invite processing is complete and diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 88d686c9887..9b1cf49e7d6 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -81,17 +81,17 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult = &mut conn.into(); + let (existing_crate, auth) = { + use diesel_async::RunQueryDsl; // this query should only be used for the endpoint scope calculation // since a race condition there would only cause `publish-new` instead of // `publish-update` to be used. let existing_crate: Option = Crate::by_name(&metadata.name) - .first::(conn) + .first::(&mut conn) + .await .optional()?; let endpoint_scope = match existing_crate { @@ -102,7 +102,15 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult = &mut conn.into(); let api_token_id = auth.api_token_id(); let user = auth.user(); diff --git a/src/controllers/krate/search.rs b/src/controllers/krate/search.rs index f67635b3b8b..57db6213d34 100644 --- a/src/controllers/krate/search.rs +++ b/src/controllers/krate/search.rs @@ -6,10 +6,12 @@ use axum::Json; use diesel::dsl::{exists, sql, InnerJoinQuerySource, LeftJoinQuerySource}; use diesel::sql_types::{Array, Bool, Text}; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::AsyncPgConnection; use diesel_full_text_search::*; use http::request::Parts; use serde_json::Value; use std::cell::OnceCell; +use tokio::runtime::Handle; use crate::app::AppState; use crate::controllers::helpers::Paginate; @@ -22,7 +24,6 @@ use crate::controllers::helpers::pagination::{Page, Paginated, PaginationOptions use crate::models::krate::ALL_COLUMNS; use crate::sql::{array_agg, canon_crate_name, lower}; use crate::tasks::spawn_blocking; -use crate::util::diesel::Conn; use crate::util::RequestUtils; /// Handles the `GET /crates` route. @@ -303,12 +304,14 @@ impl<'a> FilterParams<'a> { .as_deref() } - fn authed_user_id(&self, req: &Parts, conn: &mut impl Conn) -> AppResult { + fn authed_user_id(&self, req: &Parts, conn: &mut AsyncPgConnection) -> AppResult { if let Some(val) = self._auth_user_id.get() { return Ok(*val); } - let user_id = AuthCheck::default().check(req, conn)?.user_id(); + let user_id = Handle::current() + .block_on(AuthCheck::default().check(req, conn))? + .user_id(); // This should not fail, because of the `get()` check above let _ = self._auth_user_id.set(user_id); @@ -319,7 +322,7 @@ impl<'a> FilterParams<'a> { fn make_query( &'a self, req: &Parts, - conn: &mut impl Conn, + conn: &mut AsyncPgConnection, ) -> AppResult> { let mut query = crates::table.into_boxed(); diff --git a/src/controllers/token.rs b/src/controllers/token.rs index 5b17c3de40d..2cef5883c96 100644 --- a/src/controllers/token.rs +++ b/src/controllers/token.rs @@ -41,13 +41,13 @@ pub async fn list( Query(params): Query, req: Parts, ) -> AppResult> { - let conn = app.db_read_prefer_primary().await?; + let mut conn = app.db_read_prefer_primary().await?; + let auth = AuthCheck::only_cookie().check(&req, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::only_cookie().check(&req, conn)?; let user = auth.user(); let tokens: Vec = ApiToken::belonging_to(user) @@ -92,13 +92,13 @@ pub async fn new( return Err(bad_request("name must have a value")); } - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let auth = AuthCheck::default().check(&parts, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default().check(&parts, conn)?; if auth.api_token_id().is_some() { return Err(bad_request( "cannot use an API token to create a new API token", @@ -175,13 +175,13 @@ pub async fn new( /// Handles the `GET /me/tokens/:id` route. pub async fn show(app: AppState, Path(id): Path, req: Parts) -> AppResult> { - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let auth = AuthCheck::default().check(&req, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default().check(&req, conn)?; let user = auth.user(); let token = ApiToken::belonging_to(user) .find(id) @@ -195,13 +195,13 @@ pub async fn show(app: AppState, Path(id): Path, req: Parts) -> AppResult, req: Parts) -> AppResult> { - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let auth = AuthCheck::default().check(&req, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default().check(&req, conn)?; let user = auth.user(); diesel::update(ApiToken::belonging_to(user).find(id)) .set(api_tokens::revoked.eq(true)) @@ -214,13 +214,13 @@ pub async fn revoke(app: AppState, Path(id): Path, req: Parts) -> AppResult /// Handles the `DELETE /tokens/current` route. pub async fn revoke_current(app: AppState, req: Parts) -> AppResult { - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let auth = AuthCheck::default().check(&req, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default().check(&req, conn)?; let api_token_id = auth .api_token_id() .ok_or_else(|| bad_request("token not provided"))?; diff --git a/src/controllers/user/me.rs b/src/controllers/user/me.rs index 06aafedf705..dc4fb71f389 100644 --- a/src/controllers/user/me.rs +++ b/src/controllers/user/me.rs @@ -21,14 +21,16 @@ use crate::views::{EncodableMe, EncodablePrivateUser, EncodableVersion, OwnedCra /// Handles the `GET /me` route. pub async fn me(app: AppState, req: Parts) -> AppResult> { - let conn = app.db_read_prefer_primary().await?; + let mut conn = app.db_read_prefer_primary().await?; + let user_id = AuthCheck::only_cookie() + .check(&req, &mut conn) + .await? + .user_id(); spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let user_id = AuthCheck::only_cookie().check(&req, conn)?.user_id(); - let (user, verified, email, verification_sent): (User, Option, Option, bool) = users::table .find(user_id) @@ -67,11 +69,11 @@ pub async fn me(app: AppState, req: Parts) -> AppResult> { /// Handles the `GET /me/updates` route. pub async fn updates(app: AppState, req: Parts) -> AppResult> { - let conn = app.db_read_prefer_primary().await?; + let mut conn = app.db_read_prefer_primary().await?; + let auth = AuthCheck::only_cookie().check(&req, &mut conn).await?; spawn_blocking(move || { let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::only_cookie().check(&req, conn)?; let user = auth.user(); let followed_crates = Follow::belonging_to(user).select(follows::crate_id); @@ -145,7 +147,11 @@ pub async fn update_email_notifications(app: AppState, req: BytesRequest) -> App .map(|c| (c.id, c.email_notifications)) .collect(); - let conn = app.db_write().await?; + let mut conn = app.db_write().await?; + let user_id = AuthCheck::default() + .check(&parts, &mut conn) + .await? + .user_id(); spawn_blocking(move || { use diesel::RunQueryDsl; @@ -153,8 +159,6 @@ pub async fn update_email_notifications(app: AppState, req: BytesRequest) -> App use diesel::pg::upsert::excluded; - let user_id = AuthCheck::default().check(&parts, conn)?.user_id(); - // Build inserts from existing crates belonging to the current user let to_insert = CrateOwner::by_owner_kind(OwnerKind::User) .filter(crate_owners::owner_id.eq(user_id)) diff --git a/src/controllers/user/update.rs b/src/controllers/user/update.rs index deba7359579..ca607eecfc0 100644 --- a/src/controllers/user/update.rs +++ b/src/controllers/user/update.rs @@ -33,13 +33,13 @@ pub async fn update_user( req: Parts, Json(user_update): Json, ) -> AppResult { - let conn = state.db_write().await?; + let mut conn = state.db_write().await?; + let auth = AuthCheck::default().check(&req, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default().check(&req, conn)?; let user = auth.user(); // need to check if current user matches user to be updated @@ -120,13 +120,13 @@ pub async fn regenerate_token_and_send( Path(param_user_id): Path, req: Parts, ) -> AppResult { - let conn = state.db_write().await?; + let mut conn = state.db_write().await?; + let auth = AuthCheck::default().check(&req, &mut conn).await?; spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - let auth = AuthCheck::default().check(&req, conn)?; let user = auth.user(); // need to check if current user matches user to be updated diff --git a/src/controllers/version/metadata.rs b/src/controllers/version/metadata.rs index 86a040a001c..2ae82dc3030 100644 --- a/src/controllers/version/metadata.rs +++ b/src/controllers/version/metadata.rs @@ -9,6 +9,7 @@ use axum::Json; use crates_io_database::schema::{crates, dependencies}; use crates_io_worker::BackgroundJob; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::AsyncPgConnection; use http::request::Parts; use http::StatusCode; use serde::Deserialize; @@ -16,7 +17,7 @@ use serde_json::Value; use tokio::runtime::Handle; use crate::app::AppState; -use crate::auth::AuthCheck; +use crate::auth::{AuthCheck, Authentication}; use crate::models::token::EndpointScope; use crate::models::{ insert_version_owner_action, Crate, Dependency, Rights, Version, VersionAction, @@ -128,16 +129,17 @@ pub async fn update( let mut conn = state.db_write().await?; let (mut version, krate) = version_and_crate(&mut conn, &crate_name, &version).await?; + validate_yank_update(&update_request.version, &version)?; + let auth = authenticate(&req, &mut conn, &krate.name).await?; spawn_blocking(move || { let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - validate_yank_update(&update_request.version, &version)?; perform_version_yank_update( &state, - &req, conn, &mut version, &krate, + &auth, update_request.version.yanked, update_request.version.yank_message, )?; @@ -166,22 +168,29 @@ fn validate_yank_update(update_data: &VersionUpdate, version: &Version) -> AppRe Ok(()) } +pub async fn authenticate( + req: &Parts, + conn: &mut AsyncPgConnection, + name: &str, +) -> AppResult { + AuthCheck::default() + .with_endpoint_scope(EndpointScope::Yank) + .for_crate(name) + .check(req, conn) + .await +} + pub fn perform_version_yank_update( state: &AppState, - req: &Parts, conn: &mut impl Conn, version: &mut Version, krate: &Crate, + auth: &Authentication, yanked: Option, yank_message: Option, ) -> AppResult<()> { use diesel::RunQueryDsl; - let auth = AuthCheck::default() - .with_endpoint_scope(EndpointScope::Yank) - .for_crate(&krate.name) - .check(req, conn)?; - state .rate_limiter .check_rate_limit(auth.user_id(), LimitedAction::YankUnyank, conn)?; diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index e18805f2ffa..51bc588d1ca 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -1,6 +1,6 @@ //! Endpoints for yanking and unyanking specific versions of crates -use super::metadata::perform_version_yank_update; +use super::metadata::{authenticate, perform_version_yank_update}; use super::version_and_crate; use crate::app::AppState; use crate::controllers::helpers::ok_true; @@ -54,9 +54,18 @@ async fn modify_yank( let mut conn = state.db_write().await?; let (mut version, krate) = version_and_crate(&mut conn, &crate_name, &version).await?; + let auth = authenticate(&req, &mut conn, &crate_name).await?; spawn_blocking(move || { let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - perform_version_yank_update(&state, &req, conn, &mut version, &krate, Some(yanked), None)?; + perform_version_yank_update( + &state, + conn, + &mut version, + &krate, + &auth, + Some(yanked), + None, + )?; ok_true() }) .await diff --git a/src/models/token.rs b/src/models/token.rs index 77f558a2d1a..3a11715dede 100644 --- a/src/models/token.rs +++ b/src/models/token.rs @@ -1,6 +1,7 @@ mod scopes; use chrono::NaiveDateTime; +use diesel_async::AsyncPgConnection; pub use self::scopes::{CrateScope, EndpointScope}; use crate::models::User; @@ -92,6 +93,43 @@ impl ApiToken { .or_else(|_| tokens.select(ApiToken::as_select()).first(conn)) .map_err(Into::into) } + + pub async fn async_find_by_api_token( + conn: &mut AsyncPgConnection, + token: &HashedToken, + ) -> QueryResult { + use diesel::{dsl::now, update}; + use diesel_async::scoped_futures::ScopedFutureExt; + use diesel_async::{AsyncConnection, RunQueryDsl}; + + let tokens = api_tokens::table + .filter(api_tokens::revoked.eq(false)) + .filter( + api_tokens::expired_at + .is_null() + .or(api_tokens::expired_at.gt(now)), + ) + .filter(api_tokens::token.eq(token)); + + // If the database is in read only mode, we can't update last_used_at. + // Try updating in a new transaction, if that fails, fall back to reading + let token = conn + .transaction(|conn| { + async move { + update(tokens) + .set(api_tokens::last_used_at.eq(now.nullable())) + .returning(ApiToken::as_returning()) + .get_result(conn) + .await + } + .scope_boxed() + }) + .await; + let Ok(_) = token else { + return tokens.select(ApiToken::as_select()).first(conn).await; + }; + token + } } #[derive(Debug)] diff --git a/src/models/user.rs b/src/models/user.rs index bd895f173ce..88a7307debe 100644 --- a/src/models/user.rs +++ b/src/models/user.rs @@ -1,4 +1,5 @@ use chrono::NaiveDateTime; +use diesel_async::AsyncPgConnection; use secrecy::SecretString; use crate::app::App; @@ -34,6 +35,12 @@ impl User { users::table.find(id).first(conn) } + pub async fn async_find(conn: &mut AsyncPgConnection, id: i32) -> QueryResult { + use diesel_async::RunQueryDsl; + + users::table.find(id).first(conn).await + } + pub fn find_by_login(conn: &mut impl Conn, login: &str) -> QueryResult { use diesel::RunQueryDsl;