diff --git a/svc/pkg/cluster/src/workflows/datacenter/scale.rs b/svc/pkg/cluster/src/workflows/datacenter/scale.rs index ce67ef9416..8c42c98712 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/scale.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/scale.rs @@ -9,7 +9,6 @@ // tainted server: a tainted server use std::{ - cmp::Ordering, collections::HashMap, convert::{TryFrom, TryInto}, iter::{DoubleEndedIterator, Iterator}, @@ -278,13 +277,6 @@ async fn inner( }; scale_servers(&ctx, tx, &mut actions, &servers, &pool_ctx).await?; - - match pool_ctx.pool_type { - PoolType::Job => { - cleanup_tainted_job_servers(&ctx, tx, &mut actions, &servers, &pool_ctx).await? - } - _ => cleanup_tainted_servers(&ctx, tx, &mut actions, &servers, &pool_ctx).await?, - } } destroy_drained_servers(&ctx, tx, &mut actions, &servers).await?; @@ -304,62 +296,92 @@ async fn scale_servers( .filter(|server| server.pool_type == pctx.pool_type) .filter(|server| !server.is_tainted); - let active_servers_in_pool = servers_in_pool + // Active servers may not be entirely installed. This is important as we cannot filter out servers that + // aren't installed or provisioned yet here. + let active_servers = servers_in_pool .clone() .filter(|server| matches!(server.drain_state, DrainState::None)); - let active_count = active_servers_in_pool.clone().count(); + let active_count = active_servers.clone().count(); tracing::info!(desired=%pctx.desired_count, active=%active_count, "comparing {:?}", pctx.pool_type); - match pctx.desired_count.cmp(&active_count) { - Ordering::Less => match pctx.pool_type { - PoolType::Job => { - scale_down_job_servers(ctx, tx, actions, pctx, active_servers_in_pool, active_count) - .await? + // Scale up + if pctx.desired_count > active_count { + scale_up_servers(ctx, tx, actions, pctx, servers_in_pool, active_count).await?; + } + + // Scale down + match pctx.pool_type { + PoolType::Job => { + let (nomad_servers, without_nomad_servers) = active_servers + .clone() + .partition::, _>(|server| server.has_nomad_node); + + if pctx.desired_count < nomad_servers.len() { + scale_down_job_servers( + ctx, + tx, + actions, + pctx, + nomad_servers, + without_nomad_servers, + ) + .await?; } - PoolType::Gg => { - scale_down_gg_servers(ctx, tx, actions, pctx, active_servers_in_pool, active_count) - .await? + } + PoolType::Gg => { + let installed_servers = active_servers.filter(|server| server.is_installed); + let installed_count = installed_servers.clone().count(); + + if pctx.desired_count < installed_count { + scale_down_gg_servers(ctx, tx, actions, pctx, installed_servers, installed_count) + .await?; } - PoolType::Ats => { - scale_down_ats_servers(ctx, tx, actions, pctx, active_servers_in_pool, active_count) - .await? + } + PoolType::Ats => { + let installed_servers = active_servers.filter(|server| server.is_installed); + let installed_count = installed_servers.clone().count(); + + if pctx.desired_count < installed_count { + scale_down_ats_servers(ctx, tx, actions, pctx, installed_servers, installed_count) + .await?; } - }, - Ordering::Greater => { - scale_up_servers(ctx, tx, actions, pctx, servers_in_pool, active_count).await?; } - Ordering::Equal => {} + } + + // Cleanup + match pctx.pool_type { + PoolType::Job => cleanup_tainted_job_servers(ctx, tx, actions, servers, pctx).await?, + _ => cleanup_tainted_servers(ctx, tx, actions, servers, pctx).await?, } Ok(()) } -async fn scale_down_job_servers<'a, I: Iterator>( +async fn scale_down_job_servers( ctx: &ActivityCtx, tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, actions: &mut Vec, pctx: &PoolCtx, - active_servers: I, - active_count: usize, + nomad_servers: Vec<&Server>, + without_nomad_servers: Vec<&Server>, ) -> GlobalResult<()> { tracing::info!( datacenter_id=?pctx.datacenter_id, desired=%pctx.desired_count, - active=%active_count, + nomad_servers=%nomad_servers.len(), "scaling down job" ); - let (nomad_servers, without_nomad_servers) = - active_servers.partition::, _>(|server| server.has_nomad_node); + let diff = nomad_servers.len().saturating_sub(pctx.desired_count); let destroy_count = match pctx.provider { // Never destroy servers when scaling down with Linode, always drain Provider::Linode => 0, #[allow(unreachable_patterns)] - _ => (active_count - pctx.desired_count).min(without_nomad_servers.len()), + _ => diff.min(without_nomad_servers.len()), }; - let drain_count = active_count - pctx.desired_count - destroy_count; + let drain_count = diff - destroy_count; // Destroy servers if destroy_count != 0 { @@ -394,23 +416,23 @@ async fn scale_down_gg_servers<'a, I: Iterator + DoubleEndedI tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, actions: &mut Vec, pctx: &PoolCtx, - active_servers: I, - active_count: usize, + installed_servers: I, + installed_count: usize, ) -> GlobalResult<()> { tracing::info!( datacenter_id=?pctx.datacenter_id, desired=%pctx.desired_count, - active=%active_count, + installed=%installed_count, "scaling down gg" ); - let drain_count = active_count - pctx.desired_count; + let drain_count = installed_count.saturating_sub(pctx.desired_count); // Drain servers if drain_count != 0 { tracing::info!(count=%drain_count, "draining gg servers"); - let drain_candidates = active_servers + let drain_candidates = installed_servers .rev() .take(drain_count) .map(|server| server.server_id); @@ -429,23 +451,23 @@ async fn scale_down_ats_servers< tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, actions: &mut Vec, pctx: &PoolCtx, - active_servers: I, - active_count: usize, + installed_servers: I, + installed_count: usize, ) -> GlobalResult<()> { tracing::info!( datacenter_id=?pctx.datacenter_id, desired=%pctx.desired_count, - active=%active_count, + installed=%installed_count, "scaling down ats" ); - let drain_count = active_count - pctx.desired_count; + let drain_count = installed_count.saturating_sub(pctx.desired_count); // Drain servers if drain_count != 0 { tracing::info!(count=%drain_count, "draining ats servers"); - let drain_candidates = active_servers + let drain_candidates = installed_servers .rev() .take(drain_count) .map(|server| server.server_id); diff --git a/svc/pkg/cluster/src/workflows/server/mod.rs b/svc/pkg/cluster/src/workflows/server/mod.rs index 0ab45b1c01..6908879f6f 100644 --- a/svc/pkg/cluster/src/workflows/server/mod.rs +++ b/svc/pkg/cluster/src/workflows/server/mod.rs @@ -213,6 +213,15 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob }) .await?; + // Scale to bring up a new server to take this server's place + ctx.tagged_signal( + &json!({ + "datacenter_id": input.datacenter_id, + }), + crate::workflows::datacenter::Scale {}, + ) + .await?; + bail!("failed all attempts to provision server"); }; diff --git a/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs b/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs index b210553f4b..ea65cde465 100644 --- a/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs +++ b/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs @@ -557,7 +557,7 @@ pub async fn run_from_env() -> GlobalResult<()> { )?; #[derive(Serialize, Hash)] - struct CreateDisksInput { + struct CreateBootDiskInput { api_token: Option, image: String, ssh_public_key: String, @@ -565,15 +565,9 @@ pub async fn run_from_env() -> GlobalResult<()> { disk_size: u64, } - #[derive(Serialize, Hash)] - struct CreateDisksOutput { - boot_id: u64, - swap_id: u64, - } - wf.activity( - "create_disks", - CreateDisksInput { + "create_boot_disk", + CreateBootDiskInput { api_token: dc.provider_api_token.clone(), image: "linode/debian11".to_string(), // Not the actual public key, but not required @@ -582,12 +576,42 @@ pub async fn run_from_env() -> GlobalResult<()> { // Not the actual server disk size, but not required disk_size: 0, }, - CreateDisksOutput { + // Not the actual boot id, but not required + 0, + )?; + + #[derive(Serialize, Hash)] + struct WaitDiskReadyInput { + api_token: Option, + linode_id: u64, + disk_id: u64, + } + + wf.activity( + "wait_disk_ready", + WaitDiskReadyInput { + api_token: dc.provider_api_token.clone(), + linode_id, // Not the actual boot id, but not required - boot_id: 0, - // Not the actual swap id, but not required - swap_id: 0, + disk_id: 0, }, + serde_json::Value::Null, + )?; + + #[derive(Serialize, Hash)] + struct CreateSwapDiskInput { + api_token: Option, + linode_id: u64, + } + + wf.activity( + "create_swap_disk", + CreateSwapDiskInput { + api_token: dc.provider_api_token.clone(), + linode_id, + }, + // Not the actual boot id, but not required + 0, )?; #[derive(Serialize, Hash)] @@ -595,7 +619,8 @@ pub async fn run_from_env() -> GlobalResult<()> { api_token: Option, vlan_ip: Option, linode_id: u64, - disks: CreateDisksOutput, + boot_disk_id: u64, + swap_disk_id: u64, } wf.activity( @@ -613,12 +638,10 @@ pub async fn run_from_env() -> GlobalResult<()> { }) .transpose()?, linode_id, - disks: CreateDisksOutput { - // Not the actual boot id, but not required - boot_id: 0, - // Not the actual swap id, but not required - swap_id: 0, - }, + // Not the actual boot id, but not required + boot_disk_id: 0, + // Not the actual swap id, but not required + swap_disk_id: 0, }, serde_json::Value::Null, )?; diff --git a/svc/pkg/linode/src/util/api.rs b/svc/pkg/linode/src/util/api.rs index bd5b221438..5ad17c357f 100644 --- a/svc/pkg/linode/src/util/api.rs +++ b/svc/pkg/linode/src/util/api.rs @@ -107,18 +107,13 @@ pub struct CreateDiskResponse { pub id: u64, } -pub struct CreateDisksResponse { - pub boot_id: u64, - pub swap_id: u64, -} - -pub async fn create_disks( +pub async fn create_boot_disk( client: &Client, ssh_key: &str, linode_id: u64, image: &str, server_disk_size: u64, -) -> GlobalResult { +) -> GlobalResult { tracing::info!("creating boot disk"); let boot_disk_res = client @@ -134,8 +129,10 @@ pub async fn create_disks( ) .await?; - wait_disk_ready(client, linode_id, boot_disk_res.id).await?; + Ok(boot_disk_res.id) +} +pub async fn create_swap_disk(client: &Client, linode_id: u64) -> GlobalResult { tracing::info!("creating swap disk"); let swap_disk_res = client @@ -149,10 +146,7 @@ pub async fn create_disks( ) .await?; - Ok(CreateDisksResponse { - boot_id: boot_disk_res.id, - swap_id: swap_disk_res.id, - }) + Ok(swap_disk_res.id) } pub async fn create_instance_config( diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index c72b884214..db0ffcd17e 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -154,8 +154,8 @@ async fn provision( }) .await?; - let disks_res = ctx - .activity(CreateDisksInput { + let boot_disk_id = ctx + .activity(CreateBootDiskInput { api_token: input.api_token.clone(), image: input .custom_image @@ -166,13 +166,27 @@ async fn provision( disk_size: create_instance_res.server_disk_size, }) .await?; - let boot_disk_id = disks_res.boot_id; + + ctx.activity(WaitDiskReadyInput { + api_token: input.api_token.clone(), + linode_id: create_instance_res.linode_id, + disk_id: boot_disk_id, + }) + .await?; + + let swap_disk_id = ctx + .activity(CreateSwapDiskInput { + api_token: input.api_token.clone(), + linode_id: create_instance_res.linode_id, + }) + .await?; ctx.activity(CreateInstanceConfigInput { api_token: input.api_token.clone(), vlan_ip: input.vlan_ip, linode_id: create_instance_res.linode_id, - disks: disks_res, + boot_disk_id, + swap_disk_id, }) .await?; @@ -299,7 +313,7 @@ async fn wait_instance_ready( } #[derive(Debug, Serialize, Deserialize, Hash)] -struct CreateDisksInput { +struct CreateBootDiskInput { api_token: Option, image: String, ssh_public_key: String, @@ -307,33 +321,51 @@ struct CreateDisksInput { disk_size: u64, } -#[derive(Debug, Serialize, Deserialize, Hash)] -struct CreateDisksOutput { - boot_id: u64, - swap_id: u64, -} - -#[activity(CreateDisks)] -async fn create_disks( - ctx: &ActivityCtx, - input: &CreateDisksInput, -) -> GlobalResult { +#[activity(CreateBootDisk)] +async fn create_boot_disk(ctx: &ActivityCtx, input: &CreateBootDiskInput) -> GlobalResult { // Build HTTP client let client = client::Client::new(input.api_token.clone()).await?; - let create_disks_res = api::create_disks( + api::create_boot_disk( &client, &input.ssh_public_key, input.linode_id, &input.image, input.disk_size, ) - .await?; + .await +} - Ok(CreateDisksOutput { - boot_id: create_disks_res.boot_id, - swap_id: create_disks_res.swap_id, - }) +#[derive(Debug, Serialize, Deserialize, Hash)] +struct WaitDiskReadyInput { + api_token: Option, + linode_id: u64, + disk_id: u64, +} + +#[activity(WaitDiskReady)] +async fn wait_disk_ready( + ctx: &ActivityCtx, + input: &WaitDiskReadyInput, +) -> GlobalResult<()> { + // Build HTTP client + let client = client::Client::new(input.api_token.clone()).await?; + + api::wait_disk_ready(&client, input.linode_id, input.disk_id).await +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct CreateSwapDiskInput { + api_token: Option, + linode_id: u64, +} + +#[activity(CreateSwapDisk)] +async fn create_swap_disk(ctx: &ActivityCtx, input: &CreateSwapDiskInput) -> GlobalResult { + // Build HTTP client + let client = client::Client::new(input.api_token.clone()).await?; + + api::create_swap_disk(&client, input.linode_id).await } #[derive(Debug, Serialize, Deserialize, Hash)] @@ -341,7 +373,8 @@ struct CreateInstanceConfigInput { api_token: Option, vlan_ip: Option, linode_id: u64, - disks: CreateDisksOutput, + boot_disk_id: u64, + swap_disk_id: u64, } #[activity(CreateInstanceConfig)] @@ -356,8 +389,8 @@ async fn create_instance_config( &client, input.vlan_ip.as_ref(), input.linode_id, - input.disks.boot_id, - input.disks.swap_id, + input.boot_disk_id, + input.swap_disk_id, ) .await }