From 8cd9d0af31250625789e939de14cc6b3c919c7e0 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Wed, 23 Apr 2025 23:54:35 +0000 Subject: [PATCH] fix: add future/fdb metrics --- .../dev-full/grafana/dashboards/futures.json | 177 +++++++++ .../core/src/db/crdb_nats/mod.rs | 10 +- .../core/src/db/fdb_sqlite_nats/debug.rs | 7 +- .../core/src/db/fdb_sqlite_nats/mod.rs | 104 ++--- .../common/chirp-workflow/core/src/prelude.rs | 2 +- .../common/chirp-workflow/core/src/worker.rs | 62 ++- packages/common/fdb-util/src/lib.rs | 90 ++++- packages/common/fdb-util/src/metrics.rs | 19 +- packages/common/operation/core/src/prelude.rs | 2 +- packages/common/pools/Cargo.toml | 1 + packages/common/pools/src/db/sqlite/mod.rs | 14 +- packages/common/util/core/Cargo.toml | 2 + packages/common/util/core/src/future.rs | 63 ++- .../install/install_scripts/components/mod.rs | 6 + .../install_scripts/files/process_exporter.sh | 58 +++ .../server/install/install_scripts/mod.rs | 9 + .../edge/api/intercom/src/route/pegboard.rs | 1 + .../src/route/game_guard/actor.rs | 368 ------------------ .../src/route/game_guard/mod.rs | 4 +- .../infra/guard/server/src/routing/actor.rs | 2 + .../edge/services/pegboard/src/keys/mod.rs | 5 +- .../src/ops/actor/allocate_ingress_ports.rs | 1 + .../services/pegboard/src/ops/actor/get.rs | 1 + .../pegboard/src/ops/actor/list_for_env.rs | 1 + .../src/ops/client/update_allocation_idx.rs | 1 + .../pegboard/src/workflows/actor/destroy.rs | 1 + .../pegboard/src/workflows/actor/runtime.rs | 4 + .../pegboard/src/workflows/actor/setup.rs | 1 + .../pegboard/src/workflows/client/mod.rs | 4 + 29 files changed, 532 insertions(+), 488 deletions(-) create mode 100644 docker/dev-full/grafana/dashboards/futures.json create mode 100644 packages/core/services/cluster/src/workflows/server/install/install_scripts/files/process_exporter.sh delete mode 100644 packages/edge/api/traefik-provider/src/route/game_guard/actor.rs diff --git a/docker/dev-full/grafana/dashboards/futures.json b/docker/dev-full/grafana/dashboards/futures.json new file mode 100644 index 0000000000..dc16092490 --- /dev/null +++ b/docker/dev-full/grafana/dashboards/futures.json @@ -0,0 +1,177 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "scaleDistribution": { + "type": "linear" + } + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "interval": "15s", + "options": { + "calculate": false, + "calculation": { + "xBuckets": { + "mode": "size" + } + }, + "cellGap": 0, + "color": { + "exponent": 0.5, + "fill": "dark-orange", + "mode": "scheme", + "reverse": false, + "scale": "exponential", + "scheme": "RdBu", + "steps": 64 + }, + "exemplars": { + "color": "rgba(255,0,255,0.7)" + }, + "filterValues": { + "le": 1e-9 + }, + "legend": { + "show": true + }, + "rowsFrame": { + "layout": "auto" + }, + "tooltip": { + "mode": "single", + "showColorScale": false, + "yHistogram": true + }, + "yAxis": { + "axisPlacement": "left", + "max": "60", + "min": 0, + "reverse": false, + "unit": "s" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(increase(rivet_instrumented_future_duration_bucket{name=~\"[[name]]\", location=~\"[[location]]\"} [$__rate_interval])) by (le)", + "format": "heatmap", + "legendFormat": "{{le}}", + "range": true, + "refId": "A" + } + ], + "title": "Instrumented Future Duration", + "type": "heatmap" + } + ], + "preload": false, + "refresh": "30s", + "schemaVersion": 40, + "tags": [], + "templating": { + "list": [ + { + "current": { + "text": [ + "All" + ], + "value": [ + "$__all" + ] + }, + "definition": "label_values(rivet_instrumented_future_duration_count,name)", + "includeAll": true, + "label": "Name", + "multi": true, + "name": "name", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(rivet_instrumented_future_duration_count,name)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "type": "query" + }, + { + "current": { + "text": [ + "All" + ], + "value": [ + "$__all" + ] + }, + "definition": "label_values(rivet_instrumented_future_duration_count,location)", + "includeAll": true, + "label": "Location", + "multi": true, + "name": "location", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(rivet_instrumented_future_duration_count,location)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Futures", + "version": 0, + "weekStart": "" +} \ No newline at end of file diff --git a/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs index 15993bb771..406c19b24d 100644 --- a/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs @@ -164,6 +164,11 @@ impl Database for DatabaseCrdbNats { #[tracing::instrument(skip_all)] async fn update_worker_ping(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { + // Always update ping + metrics::WORKER_LAST_PING + .with_label_values(&[&worker_instance_id.to_string()]) + .set(rivet_util::timestamp::now()); + sql_execute!( [self] " @@ -263,11 +268,6 @@ impl Database for DatabaseCrdbNats { #[tracing::instrument(skip_all)] async fn publish_metrics(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { - // Always update ping - metrics::WORKER_LAST_PING - .with_label_values(&[&worker_instance_id.to_string()]) - .set(rivet_util::timestamp::now()); - let acquired_lock = sql_fetch_optional!( [self, (i64,)] " diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs index e4a1f01d53..6e1adcd6e6 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs @@ -335,7 +335,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { .fdb()? .run(|tx, _mc| { let workflow_ids = workflow_ids.clone(); - async move { self.get_workflows_inner(workflow_ids, &tx).await }.in_current_span() + async move { self.get_workflows_inner(workflow_ids, &tx).await } }) .await .map_err(Into::into) @@ -469,7 +469,6 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { Ok(workflows) } - .in_current_span() }) .instrument(tracing::info_span!("find_workflows_tx")) .await @@ -725,7 +724,6 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) .instrument(tracing::info_span!("wake_workflows_tx")) .await?; @@ -990,7 +988,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { .fdb()? .run(|tx, _mc| { let signal_ids = signal_ids.clone(); - async move { self.get_signals_inner(signal_ids, &tx).await }.in_current_span() + async move { self.get_signals_inner(signal_ids, &tx).await } }) .instrument(tracing::info_span!("get_signals_tx")) .await @@ -1111,7 +1109,6 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { Ok(signals) } - .in_current_span() }) .instrument(tracing::info_span!("find_signals_tx")) .await diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs index 107ec0d1a7..8ce570002c 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs @@ -14,11 +14,11 @@ use fdb_util::{end_of_key_range, keys::*, FormalChunkedKey, FormalKey, SERIALIZA use foundationdb::{ self as fdb, options::{ConflictRangeType, StreamingMode}, - tuple::Subspace, }; use futures_util::{stream::BoxStream, StreamExt, TryStreamExt}; use indoc::indoc; use rivet_pools::prelude::*; +use rivet_util::future::CustomInstrumentExt; use serde_json::json; use sqlx::Acquire; use tokio::sync::mpsc; @@ -58,7 +58,7 @@ const WORKER_WAKE_SUBJECT: &str = "chirp.workflow.fdb_sqlite_nats.worker.wake"; pub struct DatabaseFdbSqliteNats { pools: rivet_pools::Pools, - subspace: Subspace, + subspace: fdb_util::Subspace, flush_tx: mpsc::UnboundedSender, } @@ -209,7 +209,7 @@ impl Database for DatabaseFdbSqliteNats { Ok(Arc::new(DatabaseFdbSqliteNats { pools, - subspace: Subspace::all().subspace(&(RIVET, CHIRP_WORKFLOW, FDB_SQLITE_NATS)), + subspace: fdb_util::Subspace::new(&(RIVET, CHIRP_WORKFLOW, FDB_SQLITE_NATS)), flush_tx, })) } @@ -346,9 +346,8 @@ impl Database for DatabaseFdbSqliteNats { Ok((lost_worker_instance_ids, expired_workflow_count)) } - .in_current_span() }) - .instrument(tracing::info_span!("clear_expired_leases_tx")) + .custom_instrument(tracing::info_span!("clear_expired_leases_tx")) .await?; if expired_workflow_count != 0 { @@ -366,11 +365,6 @@ impl Database for DatabaseFdbSqliteNats { #[tracing::instrument(skip_all)] async fn publish_metrics(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { - // Always update ping - metrics::WORKER_LAST_PING - .with_label_values(&[&worker_instance_id.to_string()]) - .set(rivet_util::timestamp::now()); - // Attempt to be the only worker publishing metrics by writing to the lock key let acquired_lock = self .pools @@ -406,9 +400,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(lock_expired) } - .in_current_span() }) - .instrument(tracing::info_span!("acquire_lock_tx")) + .custom_instrument(tracing::info_span!("acquire_lock_tx")) .await?; if acquired_lock { @@ -614,9 +607,8 @@ impl Database for DatabaseFdbSqliteNats { pending_signal_count, )) } - .in_current_span() }) - .instrument(tracing::info_span!("publish_metrics_tx")) + .custom_instrument(tracing::info_span!("publish_metrics_tx")) .await?; for (workflow_name, counts) in other_workflow_counts { @@ -646,16 +638,13 @@ impl Database for DatabaseFdbSqliteNats { // Clear lock self.pools .fdb()? - .run(|tx, _mc| { - async move { - let metrics_lock_key = keys::worker_instance::MetricsLockKey::new(); - tx.clear(&self.subspace.pack(&metrics_lock_key)); + .run(|tx, _mc| async move { + let metrics_lock_key = keys::worker_instance::MetricsLockKey::new(); + tx.clear(&self.subspace.pack(&metrics_lock_key)); - Ok(()) - } - .in_current_span() + Ok(()) }) - .instrument(tracing::info_span!("clear_lock_tx")) + .custom_instrument(tracing::info_span!("clear_lock_tx")) .await?; } @@ -664,6 +653,10 @@ impl Database for DatabaseFdbSqliteNats { #[tracing::instrument(skip_all)] async fn update_worker_ping(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { + metrics::WORKER_LAST_PING + .with_label_values(&[&worker_instance_id.to_string()]) + .set(rivet_util::timestamp::now()); + self.pools .fdb()? .run(|tx, _mc| { @@ -680,9 +673,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("update_worker_ping_tx")) + .custom_instrument(tracing::info_span!("update_worker_ping_tx")) .await?; Ok(()) @@ -844,9 +836,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("dispatch_workflow_tx")) + .custom_instrument(tracing::info_span!("dispatch_workflow_tx")) .await?; self.wake_worker(); @@ -913,9 +904,8 @@ impl Database for DatabaseFdbSqliteNats { })) } } - .in_current_span() }) - .instrument(tracing::info_span!("get_workflow_tx")) + .custom_instrument(tracing::info_span!("get_workflow_tx")) .await .map_err(Into::into) } @@ -1001,9 +991,8 @@ impl Database for DatabaseFdbSqliteNats { } } } - .in_current_span() }) - .instrument(tracing::info_span!("find_workflow_tx")) + .custom_instrument(tracing::info_span!("find_workflow_tx")) .await?; let dt = start_instant.elapsed().as_secs_f64(); @@ -1151,7 +1140,6 @@ impl Database for DatabaseFdbSqliteNats { Ok(Some((workflow_id, workflow_name, wake_deadline_ts))) } } - .in_current_span() }) // TODO: How to get rid of this buffer? .buffer_unordered(1024) @@ -1253,7 +1241,6 @@ impl Database for DatabaseFdbSqliteNats { wake_deadline_ts, }) } - .in_current_span() }) // TODO: How to get rid of this buffer? .buffer_unordered(512) @@ -1261,9 +1248,8 @@ impl Database for DatabaseFdbSqliteNats { .instrument(tracing::trace_span!("map_to_partial_workflow")) .await } - .in_current_span() }) - .instrument(tracing::info_span!("pull_workflows_tx")) + .custom_instrument(tracing::info_span!("pull_workflows_tx")) .await?; let worker_instance_id_str = worker_instance_id.to_string(); @@ -1503,7 +1489,6 @@ impl Database for DatabaseFdbSqliteNats { events: sqlite::build_history(events)?, }) } - .in_current_span() }) .buffer_unordered(512) .try_collect() @@ -1689,9 +1674,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("complete_workflows_tx")) + .custom_instrument(tracing::info_span!("complete_workflows_tx")) .await?; self.wake_worker(); @@ -1838,9 +1822,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("commit_workflow_tx")) + .custom_instrument(tracing::info_span!("commit_workflow_tx")) .await?; // Wake worker again if the deadline is before the next tick @@ -1918,24 +1901,21 @@ impl Database for DatabaseFdbSqliteNats { // Fetch the next entry from all streams at the same time let mut results = futures_util::future::try_join_all( - streams.into_iter().map(|mut stream| { - async move { - if let Some(entry) = stream.try_next().await? { - Result::<_, fdb::FdbBindingError>::Ok(Some(( - entry.key().to_vec(), - self.subspace - .unpack::( - &entry.key(), - ) - .map_err(|x| { - fdb::FdbBindingError::CustomError(x.into()) - })?, - ))) - } else { - Ok(None) - } + streams.into_iter().map(|mut stream| async move { + if let Some(entry) = stream.try_next().await? { + Result::<_, fdb::FdbBindingError>::Ok(Some(( + entry.key().to_vec(), + self.subspace + .unpack::( + &entry.key(), + ) + .map_err(|x| { + fdb::FdbBindingError::CustomError(x.into()) + })?, + ))) + } else { + Ok(None) } - .in_current_span() }), ) .instrument(tracing::trace_span!("map_signals")) @@ -2075,9 +2055,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(None) } } - .in_current_span() }) - .instrument(tracing::info_span!("pull_next_signal_tx")) + .custom_instrument(tracing::info_span!("pull_next_signal_tx")) .await?; if signal.is_some() { @@ -2166,9 +2145,8 @@ impl Database for DatabaseFdbSqliteNats { })) } } - .in_current_span() }) - .instrument(tracing::info_span!("get_sub_workflow_tx")) + .custom_instrument(tracing::info_span!("get_sub_workflow_tx")) .await .map_err(Into::into) } @@ -2294,9 +2272,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("publish_signal_tx")) + .custom_instrument(tracing::info_span!("publish_signal_tx")) .await?; self.wake_worker(); @@ -2595,9 +2572,8 @@ impl Database for DatabaseFdbSqliteNats { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("update_workflow_tags_tx")) + .custom_instrument(tracing::info_span!("update_workflow_tags_tx")) .await?; Ok(()) diff --git a/packages/common/chirp-workflow/core/src/prelude.rs b/packages/common/chirp-workflow/core/src/prelude.rs index 587f88b52d..c8ac93d8d4 100644 --- a/packages/common/chirp-workflow/core/src/prelude.rs +++ b/packages/common/chirp-workflow/core/src/prelude.rs @@ -6,7 +6,7 @@ pub use global_error::{ext::*, prelude::*}; pub use rivet_cache; #[doc(hidden)] pub use rivet_pools::{self, prelude::*}; -pub use rivet_util::timestamp::DateTimeExt; +pub use rivet_util::{future::CustomInstrumentExt, timestamp::DateTimeExt}; pub mod util { pub use global_error::macros::*; diff --git a/packages/common/chirp-workflow/core/src/worker.rs b/packages/common/chirp-workflow/core/src/worker.rs index 7b6e58a9c0..2c9fe02bd1 100644 --- a/packages/common/chirp-workflow/core/src/worker.rs +++ b/packages/common/chirp-workflow/core/src/worker.rs @@ -23,8 +23,10 @@ use crate::{ utils, }; -/// How often to run internal tasks like updating ping, gc, and publishing metrics. -const INTERNAL_INTERVAL: Duration = Duration::from_secs(20); +/// How often to run gc and update ping. +const PING_INTERVAL: Duration = Duration::from_secs(20); +/// How often to publish metrics. +const METRICS_INTERVAL: Duration = Duration::from_secs(20); /// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM. const SHUTDOWN_DURATION: Duration = Duration::from_secs(30); @@ -72,18 +74,22 @@ impl Worker { let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval()); tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut internal_interval = tokio::time::interval(INTERNAL_INTERVAL); - internal_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM hook failed"); + let mut gc_handle = self.gc(); + let mut metrics_handle = self.publish_metrics(); + loop { tokio::select! { _ = tick_interval.tick() => {}, - _ = internal_interval.tick() => { - self.gc(); - self.publish_metrics(); - continue; + res = &mut gc_handle => { + tracing::error!(?res, "metrics task unexpectedly stopped"); + break; + } + res = &mut metrics_handle => { + tracing::error!(?res, "metrics task unexpectedly stopped"); + break; }, res = wake_sub.next() => { if res.is_none() { @@ -257,36 +263,50 @@ impl Worker { Ok(()) } - fn gc(&self) { + fn gc(&self) -> JoinHandle<()> { let db = self.db.clone(); let worker_instance_id = self.worker_instance_id; tokio::task::spawn( async move { - if let Err(err) = db.update_worker_ping(worker_instance_id).await { - tracing::error!(?err, "unhandled update ping error"); - } + let mut ping_interval = tokio::time::interval(PING_INTERVAL); + ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + ping_interval.tick().await; + + if let Err(err) = db.update_worker_ping(worker_instance_id).await { + tracing::error!(?err, "unhandled update ping error"); + } - if let Err(err) = db.clear_expired_leases(worker_instance_id).await { - tracing::error!(?err, "unhandled gc error"); + if let Err(err) = db.clear_expired_leases(worker_instance_id).await { + tracing::error!(?err, "unhandled gc error"); + } } } - .in_current_span(), - ); + .instrument(tracing::info_span!("worker_gc_task")), + ) } - fn publish_metrics(&self) { + fn publish_metrics(&self) -> JoinHandle<()> { let db = self.db.clone(); let worker_instance_id = self.worker_instance_id; tokio::task::spawn( async move { - if let Err(err) = db.publish_metrics(worker_instance_id).await { - tracing::error!(?err, "unhandled metrics error"); + let mut metrics_interval = tokio::time::interval(METRICS_INTERVAL); + metrics_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + metrics_interval.tick().await; + + if let Err(err) = db.publish_metrics(worker_instance_id).await { + tracing::error!(?err, "unhandled metrics error"); + } } } - .in_current_span(), - ); + .instrument(tracing::info_span!("worker_metrics_task")), + ) } } diff --git a/packages/common/fdb-util/src/lib.rs b/packages/common/fdb-util/src/lib.rs index eb40b852d7..385f813253 100644 --- a/packages/common/fdb-util/src/lib.rs +++ b/packages/common/fdb-util/src/lib.rs @@ -1,11 +1,19 @@ use std::{ + borrow::Cow, + ops::Deref, path::{Path, PathBuf}, result::Result::Ok, time::{Duration, Instant}, }; use anyhow::*; -use foundationdb::{self as fdb, future::FdbValue, options::DatabaseOption}; +use foundationdb::{ + self as fdb, + future::FdbValue, + options::DatabaseOption, + tuple::{self, PackResult, TuplePack, TupleUnpack}, + KeySelector, RangeOption, +}; pub mod keys; mod metrics; @@ -40,6 +48,78 @@ pub trait FormalChunkedKey { fn split(&self, value: Self::Value) -> Result>>; } +/// Wrapper type around `foundationdb::tuple::Subspace` that records metrics. +pub struct Subspace { + inner: tuple::Subspace, +} + +impl Subspace { + /// Creates a subspace with the given tuple. + pub fn new(t: &T) -> Self { + Self { + inner: tuple::Subspace::all().subspace(t), + } + } + + /// Returns a new Subspace whose prefix extends this Subspace with a given tuple encodable. + pub fn subspace(&self, t: &T) -> Self { + Self { + inner: self.inner.subspace(t), + } + } + + /// Returns the key encoding the specified Tuple with the prefix of this Subspace + /// prepended. + pub fn pack(&self, t: &T) -> Vec { + metrics::KEY_PACK_COUNT + .with_label_values(&[std::any::type_name::()]) + .inc(); + + self.inner.pack(t) + } + + /// Returns the key encoding the specified Tuple with the prefix of this Subspace + /// prepended, with a versionstamp. + pub fn pack_with_versionstamp(&self, t: &T) -> Vec { + metrics::KEY_PACK_COUNT + .with_label_values(&[std::any::type_name::()]) + .inc(); + + self.inner.pack_with_versionstamp(t) + } + + /// `unpack` returns the Tuple encoded by the given key with the prefix of this Subspace + /// removed. `unpack` will return an error if the key is not in this Subspace or does not + /// encode a well-formed Tuple. + pub fn unpack<'de, T: TupleUnpack<'de>>(&self, key: &'de [u8]) -> PackResult { + metrics::KEY_UNPACK_COUNT + .with_label_values(&[std::any::type_name::()]) + .inc(); + + self.inner.unpack(key) + } +} + +impl Deref for Subspace { + type Target = tuple::Subspace; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'a> From<&'a Subspace> for RangeOption<'static> { + fn from(subspace: &Subspace) -> Self { + let (begin, end) = subspace.range(); + + Self { + begin: KeySelector::first_greater_or_equal(Cow::Owned(begin)), + end: KeySelector::first_greater_or_equal(Cow::Owned(end)), + ..Self::default() + } + } +} + pub fn handle(fdb_cluster_path: &Path) -> Result { let db = fdb::Database::from_path( &fdb_cluster_path @@ -81,17 +161,15 @@ async fn fdb_health_check(fdb_cluster_path: PathBuf) { { Ok(res) => { let dt = start_instant.elapsed().as_secs_f64(); - metrics::FDB_PING_DURATION - .with_label_values(&[]) - .observe(dt); - metrics::FDB_MISSED_PING.with_label_values(&[]).set(0); + metrics::PING_DURATION.with_label_values(&[]).observe(dt); + metrics::MISSED_PING.with_label_values(&[]).set(0); if let Err(err) = res { tracing::error!(?err, "error checking fdb ping"); } } Err(_) => { - metrics::FDB_MISSED_PING.with_label_values(&[]).set(1); + metrics::MISSED_PING.with_label_values(&[]).set(1); tracing::error!("fdb missed ping") } diff --git a/packages/common/fdb-util/src/metrics.rs b/packages/common/fdb-util/src/metrics.rs index cd0e6f8daf..3adb4ca313 100644 --- a/packages/common/fdb-util/src/metrics.rs +++ b/packages/common/fdb-util/src/metrics.rs @@ -1,18 +1,33 @@ use rivet_metrics::{prometheus::*, MICRO_BUCKETS, REGISTRY}; lazy_static::lazy_static! { - pub static ref FDB_PING_DURATION: HistogramVec = register_histogram_vec_with_registry!( + pub static ref PING_DURATION: HistogramVec = register_histogram_vec_with_registry!( "fdb_ping_duration", "Total duration to retrieve a single value from FDB.", &[], MICRO_BUCKETS.to_vec(), *REGISTRY, ).unwrap(); - pub static ref FDB_MISSED_PING: IntGaugeVec = register_int_gauge_vec_with_registry!( + pub static ref MISSED_PING: IntGaugeVec = register_int_gauge_vec_with_registry!( "fdb_missed_ping", "1 if FDB missed the last ping.", &[], *REGISTRY, ) .unwrap(); + + pub static ref KEY_PACK_COUNT: IntCounterVec = register_int_counter_vec_with_registry!( + "fdb_key_pack_count", + "How many times a key has been packed.", + &["type"], + *REGISTRY, + ) + .unwrap(); + pub static ref KEY_UNPACK_COUNT: IntCounterVec = register_int_counter_vec_with_registry!( + "fdb_key_unpack_count", + "How many times a key has been unpacked.", + &["type"], + *REGISTRY, + ) + .unwrap(); } diff --git a/packages/common/operation/core/src/prelude.rs b/packages/common/operation/core/src/prelude.rs index ec7157472b..3b9e8e5628 100644 --- a/packages/common/operation/core/src/prelude.rs +++ b/packages/common/operation/core/src/prelude.rs @@ -3,7 +3,7 @@ pub use chirp_client::prelude::*; pub use chirp_perf::PerfCtx; pub use formatted_error; pub use global_error::{ext::*, prelude::*}; -pub use rivet_util::timestamp::DateTimeExt; +pub use rivet_util::{future::CustomInstrumentExt, timestamp::DateTimeExt}; // The code under `global_error::macros` used to be under `rivet_util`, // but it was merged, so we have to merge the exports. diff --git a/packages/common/pools/Cargo.toml b/packages/common/pools/Cargo.toml index fdf83873af..cd61bbbfb6 100644 --- a/packages/common/pools/Cargo.toml +++ b/packages/common/pools/Cargo.toml @@ -24,6 +24,7 @@ lz4_flex = "0.11.3" rivet-config.workspace = true rivet-metrics.workspace = true service-discovery.workspace = true +rivet-util.workspace = true tempfile = "3.13.0" thiserror = "1.0" tokio.workspace = true diff --git a/packages/common/pools/src/db/sqlite/mod.rs b/packages/common/pools/src/db/sqlite/mod.rs index 4a73cd70ce..7aad078455 100644 --- a/packages/common/pools/src/db/sqlite/mod.rs +++ b/packages/common/pools/src/db/sqlite/mod.rs @@ -8,9 +8,9 @@ use std::{ use dirs; use fdb_util::{prelude::*, SERIALIZABLE}; -use foundationdb::{self as fdb, options::StreamingMode, tuple::Subspace, FdbBindingError}; +use foundationdb::{self as fdb, options::StreamingMode, FdbBindingError}; use uuid::Uuid; - +use rivet_util::future::CustomInstrumentExt; use futures_util::{StreamExt, TryStreamExt}; use global_error::{bail, ext::AssertionError, unwrap, GlobalResult}; use sqlx::{ @@ -163,7 +163,7 @@ pub struct SqlitePoolManager { shutdown: broadcast::Sender<()>, fdb: Option, storage: SqliteStorage, - subspace: Subspace, + subspace: fdb_util::Subspace, } // MARK: Public methods @@ -200,7 +200,7 @@ impl SqlitePoolManager { shutdown, fdb: fdb.clone(), storage, - subspace: Subspace::all().subspace(&(RIVET, SQLITE)), + subspace: fdb_util::Subspace::new(&(RIVET, SQLITE)), }); tokio::task::spawn(manager.clone().manager_gc_loop(shutdown_rx)); @@ -504,9 +504,8 @@ impl SqlitePoolManager { Ok(()) } - .in_current_span() }) - .instrument(tracing::info_span!("snapshot_sqlite_write_tx")) + .custom_instrument(tracing::info_span!("snapshot_sqlite_write_tx")) .await?; let dt = start_instant.elapsed().as_secs_f64(); @@ -703,9 +702,8 @@ impl SqlitePoolManager { Ok((buf, chunk_count)) } - .in_current_span() }) - .instrument(tracing::info_span!("read_from_fdb_tx")) + .custom_instrument(tracing::info_span!("read_from_fdb_tx")) .await?; if chunks > 0 { diff --git a/packages/common/util/core/Cargo.toml b/packages/common/util/core/Cargo.toml index 43b347fe88..758a87877a 100644 --- a/packages/common/util/core/Cargo.toml +++ b/packages/common/util/core/Cargo.toml @@ -23,11 +23,13 @@ rand = "0.8" regex = "1.4" reqwest = { version = "0.12", default-features = false } rivet-config.workspace = true +rivet-metrics.workspace = true rivet-util-macros.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } thiserror = "1.0" tokio.workspace = true +tracing.workspace = true types-proto.workspace = true url = "2.5.4" uuid = { version = "1", features = ["v4", "serde"] } diff --git a/packages/common/util/core/src/future.rs b/packages/common/util/core/src/future.rs index ba5fc5e2da..fd034cdab9 100644 --- a/packages/common/util/core/src/future.rs +++ b/packages/common/util/core/src/future.rs @@ -1,6 +1,12 @@ -use std::future::Future; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Instant, +}; use futures_util::future; +use tracing::{instrument::Instrumented, Instrument}; /// Attempts to create a new future to select over a list of futures. /// Non-panicking version of [futures_util::future::select_all](https://docs.rs/futures/0.3.15/futures/future/fn.select_all.html). @@ -19,3 +25,58 @@ where std::future::pending().await } } + +pub trait CustomInstrumentExt: Sized { + fn custom_instrument(self, span: tracing::Span) -> CustomInstrumented { + CustomInstrumented { + inner: self.instrument(span), + start: Instant::now(), + } + } +} + +impl CustomInstrumentExt for F {} + +pub struct CustomInstrumented { + inner: Instrumented, + start: Instant, +} + +impl Future for CustomInstrumented { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { self.get_unchecked_mut() }; + let inner = unsafe { Pin::new_unchecked(&mut this.inner) }; + + let metadata = inner.span().metadata().clone(); + + match inner.poll(cx) { + Poll::Ready(val) => { + if let Some(metadata) = metadata { + if let (Some(file), Some(line)) = (metadata.file(), metadata.line()) { + metrics::INSTRUMENTED_FUTURE_DURATION + .with_label_values(&[&format!("{file}:{line}"), metadata.name()]) + .observe(this.start.elapsed().as_secs_f64()); + } + } + Poll::Ready(val) + } + Poll::Pending => Poll::Pending, + } + } +} + +mod metrics { + use rivet_metrics::{prometheus::*, MICRO_BUCKETS, REGISTRY}; + + lazy_static::lazy_static! { + pub static ref INSTRUMENTED_FUTURE_DURATION: HistogramVec = register_histogram_vec_with_registry!( + "instrumented_future_duration", + "Duration of a future.", + &["location", "name"], + MICRO_BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); + } +} diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs index e9b3c1e9e8..5094495109 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs @@ -26,6 +26,12 @@ pub mod node_exporter { } } +pub mod process_exporter { + pub fn install() -> String { + include_str!("../files/process_exporter.sh").to_string() + } +} + pub mod sysctl { pub fn install() -> String { include_str!("../files/sysctl.sh").to_string() diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/process_exporter.sh b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/process_exporter.sh new file mode 100644 index 0000000000..e472d176a2 --- /dev/null +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/process_exporter.sh @@ -0,0 +1,58 @@ +# https://github.com/ncabatoff/process-exporter/releases +version="0.8.7" + +if ! id -u "process-exporter" &>/dev/null; then + useradd -r -s /bin/false process-exporter +fi + +# Download and install process-exporter +mkdir -p /opt/process-exporter-$version/ /etc/process-exporter +wget -O /tmp/process-exporter.tar.gz https://github.com/ncabatoff/process-exporter/releases/download/v$version/process-exporter-$version.linux-amd64.tar.gz +tar -zxvf /tmp/process-exporter.tar.gz -C /opt/process-exporter-$version/ --strip-components=1 +install -o process-exporter -g process-exporter /opt/process-exporter-$version/process-exporter /usr/bin/ + +# TODO: Verify hash + +# Check version +if [[ "$(process-exporter --version)" != *"$version"* ]]; then + echo "Version check failed." + exit 1 +fi + +# Create config +cat << 'EOF' > /etc/process-exporter/config.yaml +process_names: + - name: "{{.Comm}}" + cmdline: + - '.+' +EOF + +# Create systemd service file +cat << 'EOF' > /etc/systemd/system/process-exporter.service +[Unit] +Description=Process Exporter +Requires=network-online.target +After=network-online.target + +[Service] +User=process-exporter +Group=process-exporter +Type=simple +ExecStart=/usr/bin/process-exporter --config.path /etc/process-exporter/config.yaml +Restart=always +RestartSec=2 + +# Medium CPU priority +Nice=-10 +# Standard service +CPUSchedulingPolicy=other + +[Install] +WantedBy=multi-user.target +EOF + +# Start and enable process-exporter service +systemctl daemon-reload +systemctl enable process-exporter +systemctl start process-exporter + diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs index 235220b63c..957b4af4c7 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs @@ -24,6 +24,7 @@ pub async fn gen_install( let mut script = vec![ components::common(), components::node_exporter::install(), + components::process_exporter::install(), components::sysctl::install(), components::traefik::install(), // NOTE: TLS certs expire in a year, prebakes expire in 6 months @@ -120,6 +121,14 @@ pub async fn gen_initialize( }, ); + prometheus_targets.insert( + "process_exporter".into(), + components::vector::PrometheusTarget { + endpoint: "http://127.0.0.1:9256/metrics".into(), + scrape_interval: 15, + }, + ); + // MARK: Specific pool components match pool_type { PoolType::Job => { diff --git a/packages/edge/api/intercom/src/route/pegboard.rs b/packages/edge/api/intercom/src/route/pegboard.rs index 8673dae8b3..6343ebf465 100644 --- a/packages/edge/api/intercom/src/route/pegboard.rs +++ b/packages/edge/api/intercom/src/route/pegboard.rs @@ -44,6 +44,7 @@ pub async fn prewarm_image( Ok(None) } }) + .custom_instrument(tracing::info_span!("prewarm_fetch_tx")) .await?; let Some(client_id) = client_id else { diff --git a/packages/edge/api/traefik-provider/src/route/game_guard/actor.rs b/packages/edge/api/traefik-provider/src/route/game_guard/actor.rs deleted file mode 100644 index f02ecbf45c..0000000000 --- a/packages/edge/api/traefik-provider/src/route/game_guard/actor.rs +++ /dev/null @@ -1,368 +0,0 @@ -use api_core_traefik_provider::types; -use api_helper::ctx::Ctx; -use cluster::types::GuardPublicHostname; -use fdb_util::{FormalKey, SNAPSHOT}; -use foundationdb::{self as fdb, options::StreamingMode}; -use futures_util::{StreamExt, TryStreamExt}; -use pegboard::types::{EndpointType, GameGuardProtocol}; -use rivet_operation::prelude::*; -use std::{ - collections::hash_map::DefaultHasher, - fmt::Write, - hash::{Hash, Hasher}, -}; - -use crate::auth::Auth; - -pub async fn build_actor( - ctx: &Ctx, - config: &mut types::TraefikConfigResponse, -) -> GlobalResult> { - let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; - - let (dc_res, proxied_ports) = tokio::try_join!( - ctx.op(cluster::ops::datacenter::get::Input { - datacenter_ids: vec![dc_id], - }), - async move { - ctx.fdb() - .await? - .run(|tx, _mc| async move { - let proxied_ports_subspace = pegboard::keys::subspace() - .subspace(&pegboard::keys::actor::ProxiedPortsKey::subspace()); - - tx.get_ranges_keyvalues( - fdb::RangeOption { - mode: StreamingMode::WantAll, - ..(&proxied_ports_subspace).into() - }, - // NOTE: This is not SERIALIZABLE because we don't want to conflict with port updates - // and its not important if its slightly stale - SNAPSHOT, - ) - .map(|res| match res { - Ok(entry) => { - let proxied_ports_key = pegboard::keys::subspace() - .unpack::(entry.key()) - .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; - - Ok(futures_util::stream::iter( - proxied_ports_key - .deserialize(entry.value()) - .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?, - ) - .map(move |pp| Ok((proxied_ports_key.actor_id, pp)))) - } - Err(err) => Err(Into::::into(err)), - }) - .try_flatten() - .try_collect::>() - .await - }) - .await - .map_err(Into::into) - } - )?; - - let dc = unwrap!(dc_res.datacenters.first()); - let latest_actor_create_ts = proxied_ports.iter().map(|(_, pp)| pp.create_ts).max(); - - config.http.middlewares.insert( - "actor-rate-limit".to_owned(), - types::TraefikMiddlewareHttp::RateLimit { - average: 600, - period: "1m".into(), - burst: 1000, - source_criterion: types::InFlightReqSourceCriterion::IpStrategy(types::IpStrategy { - depth: 0, - exclude_ips: None, - }), - }, - ); - config.http.middlewares.insert( - "actor-in-flight".to_owned(), - types::TraefikMiddlewareHttp::InFlightReq { - // This number needs to be high to allow for parallel requests - amount: 50, - source_criterion: types::InFlightReqSourceCriterion::IpStrategy(types::IpStrategy { - depth: 0, - exclude_ips: None, - }), - }, - ); - - // TODO(RVT-4349, RVT-4172): Retry requests in case the actor's server has not started yet - config.http.middlewares.insert( - "actor-retry".to_owned(), - types::TraefikMiddlewareHttp::Retry { - attempts: 4, - initial_interval: "250ms".into(), - }, - ); - - // Process proxied ports - for (actor_id, proxied_port) in &proxied_ports { - if let Err(err) = actor_register_proxied_port(*actor_id, proxied_port, dc, config) { - tracing::error!(?err, "failed to register proxied port") - } - } - - Ok(latest_actor_create_ts) -} - -#[tracing::instrument] -fn actor_register_proxied_port( - actor_id: Uuid, - proxied_port: &pegboard::keys::actor::ProxiedPort, - dc: &cluster::types::Datacenter, - traefik_config: &mut types::TraefikConfigResponse, -) -> GlobalResult<()> { - let ingress_port = proxied_port.ingress_port_number; - let actor_id = actor_id; - let target_port_name = proxied_port.port_name.clone(); - let service_id = format!("actor:{actor_id}:{target_port_name}"); - - // Insert the relevant service - match proxied_port.protocol { - GameGuardProtocol::Http | GameGuardProtocol::Https => { - traefik_config.http.services.insert( - service_id.clone(), - types::TraefikService { - load_balancer: types::TraefikLoadBalancer { - servers: vec![types::TraefikServer { - url: Some(format!( - "http://{}:{}", - proxied_port.lan_hostname, proxied_port.source - )), - address: None, - }], - sticky: None, - }, - }, - ); - } - GameGuardProtocol::Tcp | GameGuardProtocol::TcpTls => { - traefik_config.tcp.services.insert( - service_id.clone(), - types::TraefikService { - load_balancer: types::TraefikLoadBalancer { - servers: vec![types::TraefikServer { - url: None, - address: Some(format!( - "{}:{}", - proxied_port.lan_hostname, proxied_port.source - )), - }], - sticky: None, - }, - }, - ); - } - GameGuardProtocol::Udp => { - traefik_config.udp.services.insert( - service_id.clone(), - types::TraefikService { - load_balancer: types::TraefikLoadBalancer { - servers: vec![types::TraefikServer { - url: None, - address: Some(format!( - "{}:{}", - proxied_port.lan_hostname, proxied_port.source - )), - }], - sticky: None, - }, - }, - ); - } - }; - - // Insert the relevant router - match proxied_port.protocol { - GameGuardProtocol::Http => { - add_http_port( - actor_id, - proxied_port, - traefik_config, - &service_id, - &dc.guard_public_hostname, - false, - )?; - } - GameGuardProtocol::Https => { - add_http_port( - actor_id, - proxied_port, - traefik_config, - &service_id, - &dc.guard_public_hostname, - true, - )?; - } - GameGuardProtocol::Tcp => { - traefik_config.tcp.routers.insert( - format!("actor:{}:{}:tcp", actor_id, target_port_name), - types::TraefikRouter { - entry_points: vec![format!("lb-{ingress_port}-tcp")], - rule: Some("HostSNI(`*`)".into()), - priority: None, - service: service_id, - middlewares: vec![], - tls: None, - }, - ); - } - GameGuardProtocol::TcpTls => { - traefik_config.tcp.routers.insert( - format!("actor:{}:{}:tcp-tls", actor_id, target_port_name), - types::TraefikRouter { - entry_points: vec![format!("lb-{ingress_port}-tcp")], - rule: Some("HostSNI(`*`)".into()), - priority: None, - service: service_id, - middlewares: vec![], - tls: Some(types::TraefikTls::build(build_tls_domains( - &dc.guard_public_hostname, - )?)), - }, - ); - } - GameGuardProtocol::Udp => { - traefik_config.udp.routers.insert( - format!("actor:{}:{}:udp", actor_id, target_port_name), - types::TraefikRouter { - entry_points: vec![format!("lb-{ingress_port}-udp")], - rule: None, - priority: None, - service: service_id, - middlewares: vec![], - tls: None, - }, - ); - } - } - - Ok(()) -} - -fn add_http_port( - actor_id: Uuid, - proxied_port: &pegboard::keys::actor::ProxiedPort, - traefik_config: &mut types::TraefikConfigResponse, - service_id: &str, - guard_public_hostname: &GuardPublicHostname, - is_https: bool, -) -> GlobalResult<()> { - // Choose endpoint types to expose routes for - let supported_endpoint_types = match guard_public_hostname { - GuardPublicHostname::DnsParent(_) => vec![EndpointType::Hostname, EndpointType::Path], - GuardPublicHostname::Static(_) => vec![EndpointType::Path], - }; - - // Add routes for each endpoint type - for endpoint_type in supported_endpoint_types { - let (hostname, path) = pegboard::util::build_actor_hostname_and_path( - actor_id, - &proxied_port.port_name, - if is_https { - GameGuardProtocol::Https - } else { - GameGuardProtocol::Http - }, - endpoint_type, - guard_public_hostname, - )?; - - let mut middlewares = vec![ - "actor-rate-limit".to_string(), - "actor-in-flight".to_string(), - "actor-retry".to_string(), - ]; - let rule = format_http_rule(&hostname, proxied_port.ingress_port_number, path.as_deref())?; - - // Create unique hash to prevent collision with other ports - let unique_key = (&actor_id, &proxied_port.port_name, &rule); - let mut hasher = DefaultHasher::new(); - unique_key.hash(&mut hasher); - let hash = hasher.finish(); - - // Strip path - if let Some(path) = path { - let mw_name = format!("actor:{}:{hash:x}:strip-path", actor_id); - traefik_config.http.middlewares.insert( - mw_name.clone(), - types::TraefikMiddlewareHttp::StripPrefix { - prefixes: vec![path], - }, - ); - middlewares.push(mw_name); - } - - // Build router - let proto = if is_https { "https" } else { "http" }; - - traefik_config.http.routers.insert( - format!("actor:{}:{hash:x}:{proto}", actor_id), - types::TraefikRouter { - entry_points: vec![format!("lb-{}", proxied_port.ingress_port_number)], - rule: Some(rule), - priority: None, - service: service_id.to_string(), - middlewares, - tls: if is_https { - Some(types::TraefikTls::build(build_tls_domains( - &guard_public_hostname, - )?)) - } else { - None - }, - }, - ); - } - - Ok(()) -} - -fn format_http_rule(hostname: &str, port: u16, path: Option<&str>) -> GlobalResult { - let mut rule = "(".to_string(); - - match (hostname, path) { - (hostname, Some(path)) => { - // Matches both the host without the port (i.e. default port like - // port 80 or 443) and host with the port. - // - // Matches both the path without trailing slash (e.g. `/foo`) and subpaths (e.g. `/foo/bar`), but not `/foobar`. - write!(&mut rule, "(Host(`{hostname}`) || Host(`{hostname}:{port}`)) && (Path(`{path}`) || PathPrefix(`{path}/`))")?; - } - (hostname, None) => { - write!(&mut rule, "Host(`{hostname}`)")?; - } - } - - rule.push(')'); - - Ok(rule) -} - -fn build_tls_domains( - guard_public_hostname: &GuardPublicHostname, -) -> GlobalResult> { - let (main, sans) = match guard_public_hostname { - GuardPublicHostname::DnsParent(parent) => (parent.clone(), vec![format!("*.{parent}")]), - // This will only work if there is an SSL cert provided for the exact name of the static - // DNS address. - // - // This will not work if passing an IP address. - GuardPublicHostname::Static(static_) => (static_.clone(), vec![static_.clone()]), - }; - - // Derive TLS config. Jobs can specify their own ingress rules, so we - // need to derive which domains to use for the job. - // - // A parent wildcard SSL mode will use the parent domain as the SSL - // name. - let mut domains = Vec::new(); - domains.push(types::TraefikTlsDomain { main, sans }); - - Ok(domains) -} diff --git a/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs b/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs index 54f703e182..273c291723 100644 --- a/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs +++ b/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs @@ -1,4 +1,3 @@ -use actor::build_actor; use api::build_api; use api_core_traefik_provider::types; use api_helper::{anchor::WatchIndexQuery, ctx::Ctx}; @@ -8,7 +7,6 @@ use serde::{Deserialize, Serialize}; use crate::auth::Auth; -pub mod actor; pub mod api; pub mod job; @@ -29,8 +27,8 @@ pub async fn config( // Fetch configs and catch any errors let mut config = types::TraefikConfigResponse::default(); + build_job(&ctx, &mut config).await?; - let latest_actor_create_ts = build_actor(&ctx, &mut config).await?; build_api(&ctx, &mut config).await?; diff --git a/packages/edge/infra/guard/server/src/routing/actor.rs b/packages/edge/infra/guard/server/src/routing/actor.rs index 75ef0783f8..8d0ab965ed 100644 --- a/packages/edge/infra/guard/server/src/routing/actor.rs +++ b/packages/edge/infra/guard/server/src/routing/actor.rs @@ -184,6 +184,7 @@ async fn find_actor( Ok(exists) }) + .custom_instrument(tracing::info_span!("actor_exists_tx")) .await?; if !actor_exists { @@ -275,6 +276,7 @@ async fn fetch_proxied_ports( Ok(None) } }) + .custom_instrument(tracing::info_span!("fetch_proxied_ports_tx")) .await .map_err(Into::into) } diff --git a/packages/edge/services/pegboard/src/keys/mod.rs b/packages/edge/services/pegboard/src/keys/mod.rs index cf4ac42a56..392523ef3c 100644 --- a/packages/edge/services/pegboard/src/keys/mod.rs +++ b/packages/edge/services/pegboard/src/keys/mod.rs @@ -1,5 +1,4 @@ use fdb_util::prelude::*; -use foundationdb as fdb; pub mod actor; pub mod client; @@ -7,6 +6,6 @@ pub mod datacenter; pub mod env; pub mod port; -pub fn subspace() -> fdb::tuple::Subspace { - fdb::tuple::Subspace::all().subspace(&(RIVET, PEGBOARD)) +pub fn subspace() -> fdb_util::Subspace { + fdb_util::Subspace::new(&(RIVET, PEGBOARD)) } diff --git a/packages/edge/services/pegboard/src/ops/actor/allocate_ingress_ports.rs b/packages/edge/services/pegboard/src/ops/actor/allocate_ingress_ports.rs index f250667856..4e5737af19 100644 --- a/packages/edge/services/pegboard/src/ops/actor/allocate_ingress_ports.rs +++ b/packages/edge/services/pegboard/src/ops/actor/allocate_ingress_ports.rs @@ -180,6 +180,7 @@ pub(crate) async fn pegboard_actor_allocate_ingress_ports( Ok(results) }) + .custom_instrument(tracing::info_span!("allocate_ingress_ports_tx")) .await?; Ok(Output { ports }) diff --git a/packages/edge/services/pegboard/src/ops/actor/get.rs b/packages/edge/services/pegboard/src/ops/actor/get.rs index befc0e7e36..af2a47e4a6 100644 --- a/packages/edge/services/pegboard/src/ops/actor/get.rs +++ b/packages/edge/services/pegboard/src/ops/actor/get.rs @@ -110,6 +110,7 @@ pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> GlobalResu .try_collect::>() .await }) + .custom_instrument(tracing::info_span!("actor_list_wf_tx")) .await?; let actor_data = futures_util::stream::iter(actors_with_wf_ids) diff --git a/packages/edge/services/pegboard/src/ops/actor/list_for_env.rs b/packages/edge/services/pegboard/src/ops/actor/list_for_env.rs index 3483a777eb..0e6823bb4f 100644 --- a/packages/edge/services/pegboard/src/ops/actor/list_for_env.rs +++ b/packages/edge/services/pegboard/src/ops/actor/list_for_env.rs @@ -91,6 +91,7 @@ pub async fn pegboard_actor_list_for_env( Ok(results) }) + .custom_instrument(tracing::info_span!("actor_list_tx")) .await?; Ok(Output { actors }) diff --git a/packages/edge/services/pegboard/src/ops/client/update_allocation_idx.rs b/packages/edge/services/pegboard/src/ops/client/update_allocation_idx.rs index 18a5448e97..7bc99c674f 100644 --- a/packages/edge/services/pegboard/src/ops/client/update_allocation_idx.rs +++ b/packages/edge/services/pegboard/src/ops/client/update_allocation_idx.rs @@ -128,6 +128,7 @@ pub async fn pegboard_client_update_allocation_idx( Ok(()) }) + .custom_instrument(tracing::info_span!("client_update_alloc_idx_tx")) .await .map_err(Into::into) } diff --git a/packages/edge/services/pegboard/src/workflows/actor/destroy.rs b/packages/edge/services/pegboard/src/workflows/actor/destroy.rs index c901c194ae..aaa02b9417 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/destroy.rs @@ -161,6 +161,7 @@ pub async fn update_fdb(ctx: &ActivityCtx, input: &UpdateFdbInput) -> GlobalResu .await } }) + .custom_instrument(tracing::info_span!("actor_destroy_tx")) .await?; Ok(()) diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index 3d292ddf3c..ba91c4a393 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -365,6 +365,7 @@ async fn allocate_actor_v2( })); } }) + .custom_instrument(tracing::info_span!("actor_allocate_tx")) .await?; let dt = start_instant.elapsed().as_secs_f64(); @@ -400,6 +401,7 @@ pub async fn update_fdb(ctx: &ActivityCtx, input: &UpdateFdbInput) -> GlobalResu Ok(()) }) + .custom_instrument(tracing::info_span!("actor_clear_tx")) .await?; } } @@ -595,6 +597,7 @@ pub async fn insert_ports_fdb(ctx: &ActivityCtx, input: &InsertPortsFdbInput) -> Ok(()) } }) + .custom_instrument(tracing::info_span!("actor_insert_proxied_ports_tx")) .await?; Ok(()) @@ -899,6 +902,7 @@ async fn clear_ports_and_resources( .await } }) + .custom_instrument(tracing::info_span!("actor_clear_ports_and_resources_tx")) .await?; Ok(()) diff --git a/packages/edge/services/pegboard/src/workflows/actor/setup.rs b/packages/edge/services/pegboard/src/workflows/actor/setup.rs index bf13a16d74..e91299ea0d 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/setup.rs @@ -447,6 +447,7 @@ async fn insert_fdb(ctx: &ActivityCtx, input: &InsertFdbInput) -> GlobalResult<( Ok(()) }) + .custom_instrument(tracing::info_span!("actor_insert_tx")) .await?; Ok(()) diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index 03092667bf..d5aecfa1d1 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -441,6 +441,7 @@ async fn insert_fdb(ctx: &ActivityCtx, input: &InsertFdbInput) -> GlobalResult<( Ok(()) }) + .custom_instrument(tracing::info_span!("client_insert_tx")) .await?; Ok(()) @@ -872,6 +873,7 @@ async fn fetch_remaining_actors( .try_collect::>() .await }) + .custom_instrument(tracing::info_span!("client_fetch_remaining_actors_tx")) .await?; Ok(actor_ids) @@ -901,6 +903,7 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe Ok(last_ping_ts) }) + .custom_instrument(tracing::info_span!("client_check_expired_tx")) .await?; Ok(last_ping_ts < util::timestamp::now() - CLIENT_LOST_THRESHOLD_MS) @@ -962,6 +965,7 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global total_cpu.saturating_sub(remaining_cpu), )) }) + .custom_instrument(tracing::info_span!("client_update_metrics_tx")) .await?; metrics::CLIENT_CPU_ALLOCATED