diff --git a/packages/edge/services/pegboard/src/metrics.rs b/packages/edge/services/pegboard/src/metrics.rs index 1dfcdd9c91..24a7941054 100644 --- a/packages/edge/services/pegboard/src/metrics.rs +++ b/packages/edge/services/pegboard/src/metrics.rs @@ -8,9 +8,16 @@ lazy_static::lazy_static! { *REGISTRY ).unwrap(); - pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!( - "pegboard_client_cpu_allocated", - "Total millicores of cpu allocated on a client.", + pub static ref CLIENT_MEMORY_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_client_memory_total", + "Total MiB of memory available on a client.", + &["client_id", "flavor"], + *REGISTRY + ).unwrap(); + + pub static ref CLIENT_CPU_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_client_cpu_total", + "Total millicores of cpu available on a client.", &["client_id", "flavor"], *REGISTRY ).unwrap(); @@ -22,6 +29,13 @@ lazy_static::lazy_static! { *REGISTRY ).unwrap(); + pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_client_cpu_allocated", + "Total millicores of cpu allocated on a client.", + &["client_id", "flavor"], + *REGISTRY + ).unwrap(); + pub static ref ACTOR_ALLOCATE_DURATION: HistogramVec = register_histogram_vec_with_registry!( "pegboard_actor_allocate_duration", "Total duration to reserve resources for an actor.", @@ -38,16 +52,16 @@ lazy_static::lazy_static! { *REGISTRY, ).unwrap(); - pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( - "pegboard_env_cpu_usage", - "Total millicores used by an environment.", + pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_env_memory_usage", + "Total MiB of memory used by an environment.", &["env_id", "flavor"], *REGISTRY, ).unwrap(); - pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( - "pegboard_env_memory_usage", - "Total MiB of memory used by an environment.", + pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_env_cpu_usage", + "Total millicores used by an environment.", &["env_id", "flavor"], *REGISTRY, ).unwrap(); diff --git a/packages/edge/services/pegboard/src/workflows/actor/mod.rs b/packages/edge/services/pegboard/src/workflows/actor/mod.rs index 1bf6875e32..023628198a 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/mod.rs @@ -26,6 +26,9 @@ const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30); const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30); /// How long to wait after stopped and not receiving an exit state before setting actor as lost. const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5); +/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its +/// backoff to 0. +const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Input { @@ -124,9 +127,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul .send() .await?; - let Some((client_id, client_workflow_id)) = - runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await? - else { + let Some(res) = runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await? else { ctx.msg(Failed { message: "Failed to allocate (no availability).".into(), }) @@ -147,7 +148,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul let state_res = ctx .loope( - runtime::State::new(client_id, client_workflow_id, input.image_id), + runtime::State::new(res.client_id, res.client_workflow_id, input.image_id), |ctx, state| { let input = input.clone(); @@ -229,7 +230,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul ctx.activity(runtime::UpdateFdbInput { actor_id: input.actor_id, - client_id, + client_id: state.client_id, state: sig.state.clone(), }) .await?; diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index 3fcec4197b..019384efb9 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -14,6 +14,7 @@ use util::serde::AsHashableExt; use super::{ destroy::{self, KillCtx}, setup, Destroy, Input, ACTOR_START_THRESHOLD_MS, BASE_RETRY_TIMEOUT_MS, + RETRY_RESET_DURATION_MS, }; use crate::{ keys, metrics, @@ -26,11 +27,16 @@ use crate::{ #[derive(Deserialize, Serialize)] pub struct State { pub generation: u32, + pub client_id: Uuid, pub client_workflow_id: Uuid, pub image_id: Option, + pub drain_timeout_ts: Option, pub gc_timeout_ts: Option, + + #[serde(default)] + reschedule_state: RescheduleState, } impl State { @@ -42,6 +48,7 @@ impl State { image_id: Some(image_id), drain_timeout_ts: None, gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS), + reschedule_state: RescheduleState::default(), } } } @@ -51,6 +58,12 @@ pub struct StateRes { pub kill: Option, } +#[derive(Serialize, Deserialize, Clone, Default)] +struct RescheduleState { + last_retry_ts: i64, + retry_count: usize, +} + #[derive(Debug, Serialize, Deserialize, Hash)] struct UpdateClientInput { client_id: Uuid, @@ -224,9 +237,9 @@ struct AllocateActorInputV2 { } #[derive(Debug, Serialize, Deserialize)] -struct AllocateActorOutputV2 { - client_id: Uuid, - client_workflow_id: Uuid, +pub struct AllocateActorOutputV2 { + pub client_id: Uuid, + pub client_workflow_id: Uuid, } #[activity(AllocateActorV2)] @@ -611,7 +624,7 @@ pub async fn spawn_actor( input: &Input, actor_setup: &setup::ActorSetupCtx, generation: u32, -) -> GlobalResult> { +) -> GlobalResult> { let res = match ctx.check_version(2).await? { 1 => { ctx.activity(AllocateActorInputV1 { @@ -744,7 +757,7 @@ pub async fn spawn_actor( .send() .await?; - Ok(Some((res.client_id, res.client_workflow_id))) + Ok(Some(res)) } pub async fn reschedule_actor( @@ -769,7 +782,7 @@ pub async fn reschedule_actor( // Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate. let res = ctx - .loope(RescheduleState::default(), |ctx, state| { + .loope(state.reschedule_state.clone(), |ctx, state| { let input = input.clone(); let actor_setup = actor_setup.clone(); @@ -778,14 +791,13 @@ pub async fn reschedule_actor( let mut backoff = util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count); - // If the last retry ts is more than 2 * backoff ago, reset retry count to 0 + // If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count to 0 let now = util::timestamp::now(); - state.retry_count = - if state.last_retry_ts < now - i64::try_from(2 * backoff.current_duration())? { - 0 - } else { - state.retry_count + 1 - }; + state.retry_count = if state.last_retry_ts < now - RETRY_RESET_DURATION_MS { + 0 + } else { + state.retry_count + 1 + }; state.last_retry_ts = now; // Don't sleep for first retry @@ -797,14 +809,14 @@ pub async fn reschedule_actor( .listen_with_timeout::(Instant::from(next) - Instant::now()) .await? { - tracing::debug!("destroying before actor start"); + tracing::debug!("destroying before actor reschedule"); return Ok(Loop::Break(Err(sig))); } } if let Some(res) = spawn_actor(ctx, &input, &actor_setup, next_generation).await? { - Ok(Loop::Break(Ok(res))) + Ok(Loop::Break(Ok((state.clone(), res)))) } else { tracing::debug!(actor_id=?input.actor_id, "failed to reschedule actor, retrying"); @@ -817,10 +829,13 @@ pub async fn reschedule_actor( // Update loop state match res { - Ok((client_id, client_workflow_id)) => { + Ok((reschedule_state, res)) => { state.generation = next_generation; - state.client_id = client_id; - state.client_workflow_id = client_workflow_id; + state.client_id = res.client_id; + state.client_workflow_id = res.client_workflow_id; + + // Save reschedule state in global state + state.reschedule_state = reschedule_state; // Reset gc timeout once allocated state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS); @@ -831,12 +846,6 @@ pub async fn reschedule_actor( } } -#[derive(Serialize, Deserialize, Default)] -struct RescheduleState { - last_retry_ts: i64, - retry_count: usize, -} - #[derive(Debug, Serialize, Deserialize, Hash)] struct ClearPortsAndResourcesInput { actor_id: Uuid, diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index 79c59357ab..c5e31c5c5f 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -938,9 +938,19 @@ struct UpdateMetricsInput { #[activity(UpdateMetrics)] async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> { - let (memory, cpu) = if input.clear { - (0, 0) - } else { + if input.clear { + metrics::CLIENT_MEMORY_ALLOCATED + .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .set(0); + + metrics::CLIENT_CPU_ALLOCATED + .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .set(0); + + return Ok(()); + } + + let (total_mem, total_cpu, remaining_mem, remaining_cpu) = ctx.fdb() .await? .run(|tx, _mc| async move { @@ -983,21 +993,33 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; Ok(( - total_mem.saturating_sub(remaining_mem), - total_cpu.saturating_sub(remaining_cpu), + total_mem, + remaining_mem, + total_cpu, + remaining_cpu, )) }) .custom_instrument(tracing::info_span!("client_update_metrics_tx")) - .await? - }; + .await?; - metrics::CLIENT_CPU_ALLOCATED + metrics::CLIENT_MEMORY_TOTAL + .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .set(total_mem.try_into()?); + + metrics::CLIENT_CPU_TOTAL .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) - .set(cpu.try_into()?); + .set(total_cpu.try_into()?); + + let allocated_mem = total_mem.saturating_sub(remaining_mem); + let allocated_cpu = total_cpu.saturating_sub(remaining_cpu); metrics::CLIENT_MEMORY_ALLOCATED .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) - .set(memory.try_into()?); + .set(allocated_mem.try_into()?); + + metrics::CLIENT_CPU_ALLOCATED + .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .set(allocated_cpu.try_into()?); Ok(()) }