Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions packages/edge/services/pegboard/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ lazy_static::lazy_static! {
*REGISTRY
).unwrap();

pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_client_cpu_allocated",
"Total millicores of cpu allocated on a client.",
pub static ref CLIENT_MEMORY_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_client_memory_total",
"Total MiB of memory available on a client.",
&["client_id", "flavor"],
*REGISTRY
).unwrap();

pub static ref CLIENT_CPU_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_client_cpu_total",
"Total millicores of cpu available on a client.",
&["client_id", "flavor"],
*REGISTRY
).unwrap();
Expand All @@ -22,6 +29,13 @@ lazy_static::lazy_static! {
*REGISTRY
).unwrap();

pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_client_cpu_allocated",
"Total millicores of cpu allocated on a client.",
&["client_id", "flavor"],
*REGISTRY
).unwrap();

pub static ref ACTOR_ALLOCATE_DURATION: HistogramVec = register_histogram_vec_with_registry!(
"pegboard_actor_allocate_duration",
"Total duration to reserve resources for an actor.",
Expand All @@ -38,16 +52,16 @@ lazy_static::lazy_static! {
*REGISTRY,
).unwrap();

pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_env_cpu_usage",
"Total millicores used by an environment.",
pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_env_memory_usage",
"Total MiB of memory used by an environment.",
&["env_id", "flavor"],
*REGISTRY,
).unwrap();

pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_env_memory_usage",
"Total MiB of memory used by an environment.",
pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_env_cpu_usage",
"Total millicores used by an environment.",
&["env_id", "flavor"],
*REGISTRY,
).unwrap();
Expand Down
11 changes: 6 additions & 5 deletions packages/edge/services/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
/// How long to wait after stopped and not receiving an exit state before setting actor as lost.
const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5);
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
/// backoff to 0.
const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Input {
Expand Down Expand Up @@ -124,9 +127,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
.send()
.await?;

let Some((client_id, client_workflow_id)) =
runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await?
else {
let Some(res) = runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await? else {
ctx.msg(Failed {
message: "Failed to allocate (no availability).".into(),
})
Expand All @@ -147,7 +148,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul

let state_res = ctx
.loope(
runtime::State::new(client_id, client_workflow_id, input.image_id),
runtime::State::new(res.client_id, res.client_workflow_id, input.image_id),
|ctx, state| {
let input = input.clone();

Expand Down Expand Up @@ -229,7 +230,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul

ctx.activity(runtime::UpdateFdbInput {
actor_id: input.actor_id,
client_id,
client_id: state.client_id,
state: sig.state.clone(),
})
.await?;
Expand Down
57 changes: 33 additions & 24 deletions packages/edge/services/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use util::serde::AsHashableExt;
use super::{
destroy::{self, KillCtx},
setup, Destroy, Input, ACTOR_START_THRESHOLD_MS, BASE_RETRY_TIMEOUT_MS,
RETRY_RESET_DURATION_MS,
};
use crate::{
keys, metrics,
Expand All @@ -26,11 +27,16 @@ use crate::{
#[derive(Deserialize, Serialize)]
pub struct State {
pub generation: u32,

pub client_id: Uuid,
pub client_workflow_id: Uuid,
pub image_id: Option<Uuid>,

pub drain_timeout_ts: Option<i64>,
pub gc_timeout_ts: Option<i64>,

#[serde(default)]
reschedule_state: RescheduleState,
}

impl State {
Expand All @@ -42,6 +48,7 @@ impl State {
image_id: Some(image_id),
drain_timeout_ts: None,
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
reschedule_state: RescheduleState::default(),
}
}
}
Expand All @@ -51,6 +58,12 @@ pub struct StateRes {
pub kill: Option<KillCtx>,
}

#[derive(Serialize, Deserialize, Clone, Default)]
struct RescheduleState {
last_retry_ts: i64,
retry_count: usize,
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct UpdateClientInput {
client_id: Uuid,
Expand Down Expand Up @@ -224,9 +237,9 @@ struct AllocateActorInputV2 {
}

#[derive(Debug, Serialize, Deserialize)]
struct AllocateActorOutputV2 {
client_id: Uuid,
client_workflow_id: Uuid,
pub struct AllocateActorOutputV2 {
pub client_id: Uuid,
pub client_workflow_id: Uuid,
}

#[activity(AllocateActorV2)]
Expand Down Expand Up @@ -611,7 +624,7 @@ pub async fn spawn_actor(
input: &Input,
actor_setup: &setup::ActorSetupCtx,
generation: u32,
) -> GlobalResult<Option<(Uuid, Uuid)>> {
) -> GlobalResult<Option<AllocateActorOutputV2>> {
let res = match ctx.check_version(2).await? {
1 => {
ctx.activity(AllocateActorInputV1 {
Expand Down Expand Up @@ -744,7 +757,7 @@ pub async fn spawn_actor(
.send()
.await?;

Ok(Some((res.client_id, res.client_workflow_id)))
Ok(Some(res))
}

pub async fn reschedule_actor(
Expand All @@ -769,7 +782,7 @@ pub async fn reschedule_actor(

// Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate.
let res = ctx
.loope(RescheduleState::default(), |ctx, state| {
.loope(state.reschedule_state.clone(), |ctx, state| {
let input = input.clone();
let actor_setup = actor_setup.clone();

Expand All @@ -778,14 +791,13 @@ pub async fn reschedule_actor(
let mut backoff =
util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count);

// If the last retry ts is more than 2 * backoff ago, reset retry count to 0
// If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count to 0
let now = util::timestamp::now();
state.retry_count =
if state.last_retry_ts < now - i64::try_from(2 * backoff.current_duration())? {
0
} else {
state.retry_count + 1
};
state.retry_count = if state.last_retry_ts < now - RETRY_RESET_DURATION_MS {
0
} else {
state.retry_count + 1
};
state.last_retry_ts = now;
Comment on lines 795 to 801
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Potential race condition - state.last_retry_ts is updated after the check, which could lead to incorrect retry count calculations if multiple retries happen very close together


// Don't sleep for first retry
Expand All @@ -797,14 +809,14 @@ pub async fn reschedule_actor(
.listen_with_timeout::<Destroy>(Instant::from(next) - Instant::now())
.await?
{
tracing::debug!("destroying before actor start");
tracing::debug!("destroying before actor reschedule");

return Ok(Loop::Break(Err(sig)));
}
}

if let Some(res) = spawn_actor(ctx, &input, &actor_setup, next_generation).await? {
Ok(Loop::Break(Ok(res)))
Ok(Loop::Break(Ok((state.clone(), res))))
} else {
tracing::debug!(actor_id=?input.actor_id, "failed to reschedule actor, retrying");

Expand All @@ -817,10 +829,13 @@ pub async fn reschedule_actor(

// Update loop state
match res {
Ok((client_id, client_workflow_id)) => {
Ok((reschedule_state, res)) => {
state.generation = next_generation;
state.client_id = client_id;
state.client_workflow_id = client_workflow_id;
state.client_id = res.client_id;
state.client_workflow_id = res.client_workflow_id;

// Save reschedule state in global state
state.reschedule_state = reschedule_state;

// Reset gc timeout once allocated
state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS);
Expand All @@ -831,12 +846,6 @@ pub async fn reschedule_actor(
}
}

#[derive(Serialize, Deserialize, Default)]
struct RescheduleState {
last_retry_ts: i64,
retry_count: usize,
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct ClearPortsAndResourcesInput {
actor_id: Uuid,
Expand Down
42 changes: 32 additions & 10 deletions packages/edge/services/pegboard/src/workflows/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,9 +938,19 @@ struct UpdateMetricsInput {

#[activity(UpdateMetrics)]
async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> {
let (memory, cpu) = if input.clear {
(0, 0)
} else {
if input.clear {
metrics::CLIENT_MEMORY_ALLOCATED
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
.set(0);

metrics::CLIENT_CPU_ALLOCATED
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
.set(0);

return Ok(());
}

let (total_mem, total_cpu, remaining_mem, remaining_cpu) =
ctx.fdb()
Comment on lines +953 to 954
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Tuple order in destructuring doesn't match database query results. total_mem and total_cpu are swapped compared to query result order.

Suggested change
let (total_mem, total_cpu, remaining_mem, remaining_cpu) =
ctx.fdb()
let (total_mem, remaining_mem, total_cpu, remaining_cpu) =
ctx.fdb()

.await?
.run(|tx, _mc| async move {
Expand Down Expand Up @@ -983,21 +993,33 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

Ok((
total_mem.saturating_sub(remaining_mem),
total_cpu.saturating_sub(remaining_cpu),
total_mem,
remaining_mem,
total_cpu,
remaining_cpu,
Comment on lines +996 to +999
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Tuple return order doesn't match variable names above. Should be (total_mem, total_cpu, remaining_mem, remaining_cpu) to match destructuring.

Suggested change
total_mem,
remaining_mem,
total_cpu,
remaining_cpu,
total_mem,
total_cpu,
remaining_mem,
remaining_cpu,

))
})
.custom_instrument(tracing::info_span!("client_update_metrics_tx"))
.await?
};
.await?;

metrics::CLIENT_CPU_ALLOCATED
metrics::CLIENT_MEMORY_TOTAL
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
.set(total_mem.try_into()?);

metrics::CLIENT_CPU_TOTAL
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
.set(cpu.try_into()?);
.set(total_cpu.try_into()?);

let allocated_mem = total_mem.saturating_sub(remaining_mem);
let allocated_cpu = total_cpu.saturating_sub(remaining_cpu);

metrics::CLIENT_MEMORY_ALLOCATED
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
.set(memory.try_into()?);
.set(allocated_mem.try_into()?);

metrics::CLIENT_CPU_ALLOCATED
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
.set(allocated_cpu.try_into()?);

Ok(())
}
Expand Down
Loading