Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 105 additions & 102 deletions src/worker/jobs/sync_admins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use crate::worker::Environment;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel::RunQueryDsl;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::RunQueryDsl;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
Expand All @@ -31,117 +30,119 @@
.map(|m| m.github_id)
.collect::<HashSet<_>>();

let conn = ctx.deadpool.get().await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
let mut conn = ctx.deadpool.get().await?;

let format_repo_admins = |github_ids: &HashSet<i32>| {
repo_admins
.iter()
.filter(|m| github_ids.contains(&m.github_id))
.map(|m| format!("{} (github_id: {})", m.github, m.github_id))
.collect::<Vec<_>>()
};

let format_repo_admins = |github_ids: &HashSet<i32>| {
repo_admins
.iter()
.filter(|m| github_ids.contains(&m.github_id))
.map(|m| format!("{} (github_id: {})", m.github, m.github_id))
.collect::<Vec<_>>()
};
// Existing admins from the database.

// Existing admins from the database.
let database_admins = users::table
.left_join(emails::table)
.select((users::gh_id, users::gh_login, emails::email.nullable()))
.filter(users::is_admin.eq(true))
.get_results::<(i32, String, Option<String>)>(&mut conn)
.await?;

let database_admins = users::table
.left_join(emails::table)
.select((users::gh_id, users::gh_login, emails::email.nullable()))
.filter(users::is_admin.eq(true))
.get_results::<(i32, String, Option<String>)>(conn)?;
let database_admin_ids = database_admins
.iter()
.map(|(gh_id, _, _)| *gh_id)
.collect::<HashSet<_>>();

let database_admin_ids = database_admins
let format_database_admins = |github_ids: &HashSet<i32>| {
database_admins
.iter()
.map(|(gh_id, _, _)| *gh_id)
.collect::<HashSet<_>>();

let format_database_admins = |github_ids: &HashSet<i32>| {
database_admins
.iter()
.filter(|(gh_id, _, _)| github_ids.contains(gh_id))
.map(|(gh_id, login, _)| format!("{} (github_id: {})", login, gh_id))
.collect::<Vec<_>>()
};

// New admins from the team repo that don't have admin access yet.

let new_admin_ids = repo_admin_ids
.difference(&database_admin_ids)
.copied()
.collect::<HashSet<_>>();

let added_admin_ids = if new_admin_ids.is_empty() {
Vec::new()
} else {
let new_admins = format_repo_admins(&new_admin_ids).join(", ");
debug!("Granting admin access: {new_admins}");

diesel::update(users::table)
.filter(users::gh_id.eq_any(&new_admin_ids))
.set(users::is_admin.eq(true))
.returning(users::gh_id)
.get_results::<i32>(conn)?
};

// New admins from the team repo that have been granted admin
// access now.

let added_admin_ids = HashSet::from_iter(added_admin_ids);
if !added_admin_ids.is_empty() {
let added_admins = format_repo_admins(&added_admin_ids).join(", ");
info!("Granted admin access: {added_admins}");
}
.filter(|(gh_id, _, _)| github_ids.contains(gh_id))
.map(|(gh_id, login, _)| format!("{} (github_id: {})", login, gh_id))
.collect::<Vec<_>>()
};

// New admins from the team repo that don't have a crates.io
// account yet.
// New admins from the team repo that don't have admin access yet.

let skipped_new_admin_ids = new_admin_ids
.difference(&added_admin_ids)
.copied()
.collect::<HashSet<_>>();
let new_admin_ids = repo_admin_ids
.difference(&database_admin_ids)
.copied()
.collect::<HashSet<_>>();

if !skipped_new_admin_ids.is_empty() {
let skipped_new_admins = format_repo_admins(&skipped_new_admin_ids).join(", ");
info!("Skipped missing admins: {skipped_new_admins}");
}
let added_admin_ids = if new_admin_ids.is_empty() {
Vec::new()

Check warning on line 73 in src/worker/jobs/sync_admins.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/jobs/sync_admins.rs#L73

Added line #L73 was not covered by tests
} else {
let new_admins = format_repo_admins(&new_admin_ids).join(", ");
debug!("Granting admin access: {new_admins}");

diesel::update(users::table)
.filter(users::gh_id.eq_any(&new_admin_ids))
.set(users::is_admin.eq(true))
.returning(users::gh_id)
.get_results::<i32>(&mut conn)
.await?
};

// New admins from the team repo that have been granted admin
// access now.

let added_admin_ids = HashSet::from_iter(added_admin_ids);
if !added_admin_ids.is_empty() {
let added_admins = format_repo_admins(&added_admin_ids).join(", ");
info!("Granted admin access: {added_admins}");
}

// Existing admins from the database that are no longer in the
// team repo.

let obsolete_admin_ids = database_admin_ids
.difference(&repo_admin_ids)
.copied()
.collect::<HashSet<_>>();

let removed_admin_ids = if obsolete_admin_ids.is_empty() {
Vec::new()
} else {
let obsolete_admins = format_database_admins(&obsolete_admin_ids).join(", ");
debug!("Revoking admin access: {obsolete_admins}");

diesel::update(users::table)
.filter(users::gh_id.eq_any(&obsolete_admin_ids))
.set(users::is_admin.eq(false))
.returning(users::gh_id)
.get_results::<i32>(conn)?
};

let removed_admin_ids = HashSet::from_iter(removed_admin_ids);
if !removed_admin_ids.is_empty() {
let removed_admins = format_database_admins(&removed_admin_ids).join(", ");
info!("Revoked admin access: {removed_admins}");
}
// New admins from the team repo that don't have a crates.io
// account yet.

if added_admin_ids.is_empty() && removed_admin_ids.is_empty() {
return Ok(());
}
let skipped_new_admin_ids = new_admin_ids
.difference(&added_admin_ids)
.copied()
.collect::<HashSet<_>>();

let added_admins = format_repo_admins(&added_admin_ids);
let removed_admins = format_database_admins(&removed_admin_ids);
if !skipped_new_admin_ids.is_empty() {
let skipped_new_admins = format_repo_admins(&skipped_new_admin_ids).join(", ");
info!("Skipped missing admins: {skipped_new_admins}");
}

Check warning on line 106 in src/worker/jobs/sync_admins.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/jobs/sync_admins.rs#L106

Added line #L106 was not covered by tests

// Existing admins from the database that are no longer in the
// team repo.

let obsolete_admin_ids = database_admin_ids
.difference(&repo_admin_ids)
.copied()
.collect::<HashSet<_>>();

let removed_admin_ids = if obsolete_admin_ids.is_empty() {
Vec::new()
} else {
let obsolete_admins = format_database_admins(&obsolete_admin_ids).join(", ");
debug!("Revoking admin access: {obsolete_admins}");

diesel::update(users::table)
.filter(users::gh_id.eq_any(&obsolete_admin_ids))
.set(users::is_admin.eq(false))
.returning(users::gh_id)
.get_results::<i32>(&mut conn)
.await?
};

let removed_admin_ids = HashSet::from_iter(removed_admin_ids);
if !removed_admin_ids.is_empty() {
let removed_admins = format_database_admins(&removed_admin_ids).join(", ");
info!("Revoked admin access: {removed_admins}");
}

let email = AdminAccountEmail::new(added_admins, removed_admins);
if added_admin_ids.is_empty() && removed_admin_ids.is_empty() {
return Ok(());
}

let added_admins = format_repo_admins(&added_admin_ids);
let removed_admins = format_database_admins(&removed_admin_ids);

let email = AdminAccountEmail::new(added_admins, removed_admins);

spawn_blocking(move || {
for database_admin in &database_admins {
let (_, _, email_address) = database_admin;
if let Some(email_address) = email_address {
Expand All @@ -159,9 +160,11 @@
}
}

Ok(())
Ok::<_, anyhow::Error>(())
})
.await
.await?;

Ok(())
}
}

Expand Down