Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ rev = "b2c6f366ee68a7956fb69dd4f39357b3c184bd15"

[workspace.dependencies.rivet-term]
git = "https://github.com/rivet-gg/rivet-term"
rev = "b21d7a2"
rev = "55e328470b68c557fb9bc8298369f90182d35b6d"

[workspace.dependencies.redis]
git = "https://github.com/rivet-gg/redis-rs"
Expand Down
15 changes: 13 additions & 2 deletions packages/core/services/cluster/src/ops/server/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use chirp_workflow::prelude::*;

use crate::types::{PoolType, Server};
use crate::types::{PoolType, Server, ServerState};

#[derive(Debug)]
pub struct Input {
Expand All @@ -26,6 +26,7 @@ pub(crate) struct ServerRow {
vlan_ip: Option<IpAddr>,
public_ip: Option<IpAddr>,
cloud_destroy_ts: Option<i64>,
state: i64,
}

impl TryFrom<ServerRow> for Server {
Expand All @@ -40,6 +41,7 @@ impl TryFrom<ServerRow> for Server {
lan_ip: value.vlan_ip,
wan_ip: value.public_ip,
cloud_destroy_ts: value.cloud_destroy_ts,
state: unwrap!(ServerState::from_repr(value.state.try_into()?)),
})
}
}
Expand All @@ -56,7 +58,16 @@ pub async fn cluster_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResu
provider_server_id,
vlan_ip,
public_ip,
cloud_destroy_ts
cloud_destroy_ts,
CASE
WHEN cloud_destroy_ts IS NOT NULL THEN 6 -- Destroyed
WHEN taint_ts IS NOT NULL AND drain_ts IS NOT NULL THEN 5 -- TaintedDraining
WHEN drain_ts IS NOT NULL THEN 4 -- Draining
WHEN taint_ts IS NOT NULL THEN 3 -- Tainted
WHEN install_complete_ts IS NOT NULL THEN 2 -- Running
WHEN provision_complete_ts IS NOT NULL THEN 1 -- Installing
ELSE 0 -- Provisioning
END AS state
FROM db_cluster.servers
WHERE server_id = ANY($1)
",
Expand Down
11 changes: 10 additions & 1 deletion packages/core/services/cluster/src/ops/server/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@ pub async fn cluster_server_list(ctx: &OperationCtx, input: &Input) -> GlobalRes
s.provider_server_id,
s.vlan_ip,
s.public_ip,
s.cloud_destroy_ts
s.cloud_destroy_ts,
CASE
WHEN s.cloud_destroy_ts IS NOT NULL THEN 6 -- Destroyed
WHEN s.taint_ts IS NOT NULL AND s.drain_ts IS NOT NULL THEN 5 -- TaintedDraining
WHEN s.drain_ts IS NOT NULL THEN 4 -- Draining
WHEN s.taint_ts IS NOT NULL THEN 3 -- Tainted
WHEN s.install_complete_ts IS NOT NULL THEN 2 -- Running
WHEN s.provision_complete_ts IS NOT NULL THEN 1 -- Installing
ELSE 0 -- Provisioning
END AS state
Comment on lines +34 to +42
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: State values should be defined as constants in the codebase to avoid magic numbers and ensure consistency across queries

FROM db_cluster.servers AS s
JOIN db_cluster.datacenters AS d
ON s.datacenter_id = d.datacenter_id
Expand Down
12 changes: 12 additions & 0 deletions packages/core/services/cluster/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ pub struct Server {
pub lan_ip: Option<IpAddr>,
pub wan_ip: Option<IpAddr>,
pub cloud_destroy_ts: Option<i64>,
pub state: ServerState,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The Server struct doesn't derive Serialize/Deserialize traits but contains a serializable state field. This may cause issues if Server needs to be serialized

}

#[derive(Serialize, Deserialize, Hash, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromRepr)]
pub enum ServerState {
Provisioning = 0,
Installing = 1,
Running = 2,
Tainted = 3,
Draining = 4,
TaintedDraining = 5,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: TaintedDraining combines two states - consider if this should be a separate flag instead of a combined state for better state management

Destroyed = 6,
}

#[derive(Debug, Default, Clone)]
Expand Down
5 changes: 5 additions & 0 deletions packages/edge/infra/edge-server/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
ServiceKind::Standalone,
|config, pools| Box::pin(edge_monolith_workflow_worker::start(config, pools)),
),
Service::new(
"pegboard_usage_metrics_publish",
ServiceKind::Singleton,
|config, pools| Box::pin(pegboard_usage_metrics_publish::start(config, pools)),
),
];

Ok(RunConfigData {
Expand Down
24 changes: 20 additions & 4 deletions packages/edge/services/pegboard/src/keys/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ impl ActorKey {
pub fn subspace(client_id: Uuid) -> ActorSubspaceKey {
ActorSubspaceKey::new(client_id)
}

pub fn entire_subspace() -> ActorSubspaceKey {
ActorSubspaceKey::entire()
}
}

impl FormalKey for ActorKey {
Expand Down Expand Up @@ -290,12 +294,16 @@ impl<'de> TupleUnpack<'de> for ActorKey {
}

pub struct ActorSubspaceKey {
client_id: Uuid,
client_id: Option<Uuid>,
}
Comment on lines 296 to 298
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Missing Debug derive for ActorSubspaceKey. This could make debugging more difficult if issues arise.


impl ActorSubspaceKey {
fn new(client_id: Uuid) -> Self {
ActorSubspaceKey { client_id }
ActorSubspaceKey { client_id: Some(client_id) }
}

fn entire() -> Self {
ActorSubspaceKey { client_id: None }
}
}

Expand All @@ -305,8 +313,16 @@ impl TuplePack for ActorSubspaceKey {
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
let t = (CLIENT, ACTOR, self.client_id);
t.pack(w, tuple_depth)
let mut offset = VersionstampOffset::None { size: 0 };

let t = (CLIENT, ACTOR);
offset += t.pack(w, tuple_depth)?;

if let Some(client_id) = &self.client_id {
offset += client_id.pack(w, tuple_depth)?;
}

Ok(offset)
}
}

Expand Down
14 changes: 14 additions & 0 deletions packages/edge/services/pegboard/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,18 @@ lazy_static::lazy_static! {
BUCKETS.to_vec(),
*REGISTRY,
).unwrap();

pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_env_cpu_usage",
"Total percent of CPU (per core) used by an environment.",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Description could be more precise - 'Total percent of CPU' is ambiguous. Consider clarifying if this is average, peak, or instantaneous CPU usage.

&["env_id", "flavor"],
*REGISTRY,
).unwrap();

pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"pegboard_env_memory_usage",
"Total MiB of memory used by an environment.",
&["env_id", "flavor"],
*REGISTRY,
).unwrap();
}
1 change: 0 additions & 1 deletion packages/edge/services/pegboard/src/ops/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod update_allocation_idx;
pub mod usage_get;
162 changes: 0 additions & 162 deletions packages/edge/services/pegboard/src/ops/client/usage_get.rs

This file was deleted.

16 changes: 11 additions & 5 deletions packages/edge/services/pegboard/src/workflows/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
config,
system,
}),
activity(UpdateMetricsInput { client_id, flavor }),
activity(UpdateMetricsInput { client_id, flavor, clear: false }),
))
.await?;
}
Expand All @@ -118,7 +118,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
client_id,
events: events.clone(),
}),
activity(UpdateMetricsInput { client_id, flavor }),
activity(UpdateMetricsInput { client_id, flavor, clear: false }),
))
.await?;

Expand Down Expand Up @@ -243,6 +243,8 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
})
.await?;

ctx.activity(UpdateMetricsInput { client_id: input.client_id, flavor: input.flavor, clear: true }).await?;

let actors = ctx
.activity(FetchRemainingActorsInput {
client_id: input.client_id,
Expand Down Expand Up @@ -673,7 +675,7 @@ pub async fn handle_commands(
activity(InsertCommandsInput {
commands: raw_commands.clone(),
}),
activity(UpdateMetricsInput { client_id, flavor }),
activity(UpdateMetricsInput { client_id, flavor, clear: false }),
))
.await?;

Expand Down Expand Up @@ -914,11 +916,14 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe
struct UpdateMetricsInput {
client_id: Uuid,
flavor: ClientFlavor,
clear: bool,
}

#[activity(UpdateMetrics)]
async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> {
let (memory, cpu) =
let (memory, cpu) = if input.clear {
(0, 0)
} else {
ctx.fdb()
.await?
.run(|tx, _mc| async move {
Expand Down Expand Up @@ -966,7 +971,8 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
))
})
.custom_instrument(tracing::info_span!("client_update_metrics_tx"))
.await?;
.await?
};

metrics::CLIENT_CPU_ALLOCATED
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
Expand Down
Loading
Loading