Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 116 additions & 114 deletions src/controllers/krate/owners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::models::{krate::NewOwnerInvite, token::EndpointScope};
use crate::models::{Crate, Owner, Rights, Team, User};
use crate::tasks::spawn_blocking;
use crate::util::diesel::prelude::*;
use crate::util::errors::{bad_request, crate_not_found, custom, AppResult};
use crate::views::EncodableOwner;
Expand All @@ -12,11 +11,11 @@
use axum::Json;
use axum_extra::json;
use axum_extra::response::ErasedJson;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use http::request::Parts;
use http::StatusCode;
use secrecy::{ExposeSecret, SecretString};
use tokio::runtime::Handle;

/// Handles the `GET /crates/:crate_id/owners` route.
pub async fn owners(state: AppState, Path(crate_name): Path<String>) -> AppResult<ErasedJson> {
Expand Down Expand Up @@ -62,25 +61,23 @@

/// Handles the `GET /crates/:crate_id/owner_user` route.
pub async fn owner_user(state: AppState, Path(crate_name): Path<String>) -> AppResult<ErasedJson> {
let conn = state.db_read().await?;
spawn_blocking(move || {
use diesel::RunQueryDsl;
use diesel_async::RunQueryDsl;

let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
let mut conn = state.db_read().await?;

let krate: Crate = Crate::by_name(&crate_name)
.first(conn)
.optional()?
.ok_or_else(|| crate_not_found(&crate_name))?;
let krate: Crate = Crate::by_name(&crate_name)
.first(&mut conn)
.await
.optional()?
.ok_or_else(|| crate_not_found(&crate_name))?;

let owners = User::owning(&krate, conn)?
.into_iter()
.map(Owner::into)
.collect::<Vec<EncodableOwner>>();
let owners = User::owning(&krate, &mut conn)
.await?
.into_iter()
.map(Owner::into)
.collect::<Vec<EncodableOwner>>();

Ok(json!({ "users": owners }))
})
.await?
Ok(json!({ "users": owners }))
}

/// Handles the `PUT /crates/:crate_id/owners` route.
Expand Down Expand Up @@ -116,6 +113,8 @@
body: ChangeOwnersRequest,
add: bool,
) -> AppResult<ErasedJson> {
use diesel_async::RunQueryDsl;

let logins = body.owners;

// Bound the number of invites processed per request to limit the cost of
Expand All @@ -132,121 +131,124 @@
.for_crate(&crate_name)
.check(&parts, &mut conn)
.await?;
spawn_blocking(move || {
use diesel::RunQueryDsl;

let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();

let user = auth.user();

// The set of emails to send out after invite processing is complete and
// the database transaction has committed.
let mut emails = Vec::with_capacity(logins.len());

let comma_sep_msg = conn.transaction(|conn| {
let krate: Crate = Crate::by_name(&crate_name)
.first(conn)
.optional()?
.ok_or_else(|| crate_not_found(&crate_name))?;

let owners = krate.owners(conn)?;

match Handle::current().block_on(user.rights(&app, &owners))? {
Rights::Full => {}
// Yes!
Rights::Publish => {
return Err(custom(
StatusCode::FORBIDDEN,
"team members don't have permission to modify owners",
));
}
Rights::None => {
return Err(custom(
StatusCode::FORBIDDEN,
"only owners have permission to modify owners",
));
let user = auth.user();

let (comma_sep_msg, emails) = conn
.transaction(|conn| {
let app = app.clone();
async move {
let krate: Crate = Crate::by_name(&crate_name)
.first(conn)
.await
.optional()?
.ok_or_else(|| crate_not_found(&crate_name))?;

let owners = krate.async_owners(conn).await?;

match user.rights(&app, &owners).await? {
Rights::Full => {}
// Yes!
Rights::Publish => {
return Err(custom(
StatusCode::FORBIDDEN,
"team members don't have permission to modify owners",
));
}
Rights::None => {
return Err(custom(
StatusCode::FORBIDDEN,
"only owners have permission to modify owners",
));
}
}
}

let comma_sep_msg = if add {
let mut msgs = Vec::with_capacity(logins.len());
for login in &logins {
let login_test =
|owner: &Owner| owner.login().to_lowercase() == *login.to_lowercase();
if owners.iter().any(login_test) {
return Err(bad_request(format_args!("`{login}` is already an owner")));
}
// The set of emails to send out after invite processing is complete and
// the database transaction has committed.
let mut emails = Vec::with_capacity(logins.len());

let comma_sep_msg = if add {
let mut msgs = Vec::with_capacity(logins.len());
for login in &logins {
let login_test =
|owner: &Owner| owner.login().to_lowercase() == *login.to_lowercase();
if owners.iter().any(login_test) {
return Err(bad_request(format_args!("`{login}` is already an owner")));
}

match krate.owner_add(&app, conn, user, login) {
// A user was successfully invited, and they must accept
// the invite, and a best-effort attempt should be made
// to email them the invite token for one-click
// acceptance.
Ok(NewOwnerInvite::User(invitee, token)) => {
msgs.push(format!(
"user {} has been invited to be an owner of crate {}",
invitee.gh_login, krate.name,
));

if let Some(recipient) = invitee.verified_email(conn).ok().flatten() {
emails.push(OwnerInviteEmail {
recipient_email_address: recipient,
inviter: user.gh_login.clone(),
domain: app.emails.domain.clone(),
crate_name: krate.name.clone(),
token,
});
match krate.owner_add(&app, conn, user, login).await {
// A user was successfully invited, and they must accept
// the invite, and a best-effort attempt should be made
// to email them the invite token for one-click
// acceptance.
Ok(NewOwnerInvite::User(invitee, token)) => {
msgs.push(format!(
"user {} has been invited to be an owner of crate {}",
invitee.gh_login, krate.name,
));

if let Some(recipient) =
invitee.async_verified_email(conn).await.ok().flatten()
{
emails.push(OwnerInviteEmail {
recipient_email_address: recipient,
inviter: user.gh_login.clone(),
domain: app.emails.domain.clone(),
crate_name: krate.name.clone(),
token,
});
}
}
}

// A team was successfully invited. They are immediately
// added, and do not have an invite token.
Ok(NewOwnerInvite::Team(team)) => msgs.push(format!(
"team {} has been added as an owner of crate {}",
team.login, krate.name
)),
// A team was successfully invited. They are immediately
// added, and do not have an invite token.
Ok(NewOwnerInvite::Team(team)) => msgs.push(format!(
"team {} has been added as an owner of crate {}",
team.login, krate.name
)),

// This user has a pending invite.
Err(OwnerAddError::AlreadyInvited(user)) => msgs.push(format!(
// This user has a pending invite.
Err(OwnerAddError::AlreadyInvited(user)) => msgs.push(format!(
"user {} already has a pending invitation to be an owner of crate {}",
user.gh_login, krate.name
)),

// An opaque error occurred.
Err(OwnerAddError::AppError(e)) => return Err(e),
// An opaque error occurred.
Err(OwnerAddError::AppError(e)) => return Err(e),
}
}
}
msgs.join(",")
} else {
for login in &logins {
krate.owner_remove(conn, login)?;
}
if User::owning(&krate, conn)?.is_empty() {
return Err(bad_request(
"cannot remove all individual owners of a crate. \
msgs.join(",")
} else {
for login in &logins {
krate.owner_remove(conn, login).await?;
}
if User::owning(&krate, conn).await?.is_empty() {
return Err(bad_request(
"cannot remove all individual owners of a crate. \
Team member don't have permission to modify owners, so \
at least one individual owner is required.",
));
}
"owners successfully removed".to_owned()
};
));
}
"owners successfully removed".to_owned()
};

Ok(comma_sep_msg)
})?;
Ok((comma_sep_msg, emails))
}
.scope_boxed()
})
.await?;

// Send the accumulated invite emails now the database state has
// committed.
for email in emails {
let addr = email.recipient_email_address().to_string();
// Send the accumulated invite emails now the database state has
// committed.
for email in emails {
let addr = email.recipient_email_address().to_string();

if let Err(e) = app.emails.send(&addr, email) {
warn!("Failed to send co-owner invite email: {e}");
}
if let Err(e) = app.emails.async_send(&addr, email).await {
warn!("Failed to send co-owner invite email: {e}");

Check warning on line 247 in src/controllers/krate/owners.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/owners.rs#L247

Added line #L247 was not covered by tests
}
}

Ok(json!({ "msg": comma_sep_msg, "ok": true }))
})
.await?
Ok(json!({ "msg": comma_sep_msg, "ok": true }))
}

pub struct OwnerInviteEmail {
Expand Down
44 changes: 25 additions & 19 deletions src/models/crate_owner_invitation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use chrono::{NaiveDateTime, Utc};
use diesel_async::AsyncPgConnection;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::{AsyncConnection, AsyncPgConnection};
use http::StatusCode;
use secrecy::SecretString;

use crate::config;
use crate::models::{CrateOwner, OwnerKind};
use crate::schema::{crate_owner_invitations, crate_owners, crates};
use crate::util::diesel::prelude::*;
use crate::util::diesel::Conn;
use crate::util::errors::{custom, AppResult};

#[derive(Debug)]
Expand All @@ -30,14 +30,14 @@ pub struct CrateOwnerInvitation {
}

impl CrateOwnerInvitation {
pub fn create(
pub async fn create(
invited_user_id: i32,
invited_by_user_id: i32,
crate_id: i32,
conn: &mut impl Conn,
conn: &mut AsyncPgConnection,
config: &config::Server,
) -> QueryResult<NewCrateOwnerInvitationOutcome> {
use diesel::RunQueryDsl;
use diesel_async::RunQueryDsl;

#[derive(Insertable, Clone, Copy, Debug)]
#[diesel(table_name = crate_owner_invitations, check_for_backend(diesel::pg::Pg))]
Expand All @@ -50,22 +50,27 @@ impl CrateOwnerInvitation {
// Before actually creating the invite, check if an expired invitation already exists
// and delete it from the database. This allows obtaining a new invite if the old one
// expired, instead of returning "already exists".
conn.transaction(|conn| -> QueryResult<()> {
// This does a SELECT FOR UPDATE + DELETE instead of a DELETE with a WHERE clause to
// use the model's `is_expired` method, centralizing our expiration checking logic.
let existing: Option<CrateOwnerInvitation> = crate_owner_invitations::table
.find((invited_user_id, crate_id))
.for_update()
.first(conn)
.optional()?;

if let Some(existing) = existing {
if existing.is_expired(config) {
diesel::delete(&existing).execute(conn)?;
conn.transaction(|conn| {
async move {
// This does a SELECT FOR UPDATE + DELETE instead of a DELETE with a WHERE clause to
// use the model's `is_expired` method, centralizing our expiration checking logic.
let existing: Option<CrateOwnerInvitation> = crate_owner_invitations::table
.find((invited_user_id, crate_id))
.for_update()
.first(conn)
.await
.optional()?;

if let Some(existing) = existing {
if existing.is_expired(config) {
diesel::delete(&existing).execute(conn).await?;
}
}
QueryResult::Ok(())
}
Ok(())
})?;
.scope_boxed()
})
.await?;

let res: Option<CrateOwnerInvitation> = diesel::insert_into(crate_owner_invitations::table)
.values(&NewRecord {
Expand All @@ -78,6 +83,7 @@ impl CrateOwnerInvitation {
// deleted before doing this INSERT.
.on_conflict_do_nothing()
.get_result(conn)
.await
.optional()?;

Ok(match res {
Expand Down
Loading