diff --git a/packages/common/runtime/src/lib.rs b/packages/common/runtime/src/lib.rs index 12566ae0c9..3db8dcc664 100644 --- a/packages/common/runtime/src/lib.rs +++ b/packages/common/runtime/src/lib.rs @@ -67,12 +67,17 @@ fn build_tokio_runtime_builder() -> tokio::runtime::Builder { metrics::TOKIO_THREAD_COUNT.dec(); }); + rt_builder.on_task_spawn(move |_| { + metrics::TOKIO_TASK_TOTAL.inc(); + }); + if env::var("TOKIO_RUNTIME_METRICS").is_ok() { rt_builder.on_before_task_poll(|_| { let metrics = tokio::runtime::Handle::current().metrics(); let buckets = metrics.poll_time_histogram_num_buckets(); metrics::TOKIO_GLOBAL_QUEUE_DEPTH.set(metrics.global_queue_depth() as i64); + metrics::TOKIO_ACTIVE_TASK_COUNT.set(metrics.num_alive_tasks() as i64); for worker in 0..metrics.num_workers() { metrics::TOKIO_WORKER_OVERFLOW_COUNT diff --git a/packages/common/runtime/src/metrics.rs b/packages/common/runtime/src/metrics.rs index 29739acad6..142b65a95a 100644 --- a/packages/common/runtime/src/metrics.rs +++ b/packages/common/runtime/src/metrics.rs @@ -14,6 +14,18 @@ lazy_static::lazy_static! { "Number of pending tasks in the global queue.", *REGISTRY ).unwrap(); + pub static ref TOKIO_TASK_TOTAL: IntCounter = + register_int_counter_with_registry!( + "tokio_task_total", + "Total number of spawned tasks.", + *REGISTRY + ).unwrap(); + pub static ref TOKIO_ACTIVE_TASK_COUNT: IntGauge = + register_int_gauge_with_registry!( + "tokio_active_task_count", + "Total number of active (running or sleeping) tasks.", + *REGISTRY + ).unwrap(); pub static ref TOKIO_WORKER_OVERFLOW_COUNT: IntGaugeVec = register_int_gauge_vec_with_registry!( "tokio_worker_overflow_count", "Number of times the given worker thread saturated its local queue.", diff --git a/packages/edge/infra/guard/core/src/metrics.rs b/packages/edge/infra/guard/core/src/metrics.rs index 267bc8d9dc..9e9e8a0b17 100644 --- a/packages/edge/infra/guard/core/src/metrics.rs +++ b/packages/edge/infra/guard/core/src/metrics.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use rivet_metrics::{prometheus::*, REGISTRY}; +use rivet_metrics::{prometheus::*, REGISTRY, BUCKETS}; lazy_static! { // MARK: Internal @@ -36,6 +36,7 @@ lazy_static! { pub static ref TCP_CONNECTION_DURATION: Histogram = register_histogram_with_registry!( "guard_tcp_connection_duration", "TCP connection duration in seconds", + BUCKETS.to_vec(), *REGISTRY, ) .unwrap(); @@ -44,36 +45,36 @@ lazy_static! { pub static ref RESOLVE_ROUTE_DURATION: Histogram = register_histogram_with_registry!( "guard_resolve_route_duration", "Time to resolve request route in seconds", + BUCKETS.to_vec(), *REGISTRY, ) .unwrap(); // MARK: Proxy requests - pub static ref PROXY_REQUEST_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + pub static ref PROXY_REQUEST_TOTAL: IntCounter = register_int_counter_with_registry!( "guard_proxy_request_total", "Total number of requests to actor", - &["actor_id", "server_id", "method", "path"], *REGISTRY, ) .unwrap(); - pub static ref PROXY_REQUEST_PENDING: IntGaugeVec = register_int_gauge_vec_with_registry!( + pub static ref PROXY_REQUEST_PENDING: IntGauge = register_int_gauge_with_registry!( "guard_proxy_request_pending", "Number of pending requests to actor", - &["actor_id", "server_id", "method", "path"], *REGISTRY, ) .unwrap(); pub static ref PROXY_REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!( "guard_proxy_request_duration", "Request duration in seconds", - &["actor_id", "server_id", "status"], + &["status"], + BUCKETS.to_vec(), *REGISTRY, ) .unwrap(); pub static ref PROXY_REQUEST_ERROR: IntCounterVec = register_int_counter_vec_with_registry!( "guard_proxy_request_errors_total", "Total number of errors when proxying requests to actor", - &["actor_id", "server_id", "error_type"], + &["error_type"], *REGISTRY, ) .unwrap(); diff --git a/packages/edge/infra/guard/core/src/proxy_service.rs b/packages/edge/infra/guard/core/src/proxy_service.rs index c503cc292c..1b56c184ca 100644 --- a/packages/edge/infra/guard/core/src/proxy_service.rs +++ b/packages/edge/infra/guard/core/src/proxy_service.rs @@ -613,8 +613,6 @@ impl ProxyService { .path_and_query() .map(|x| x.to_string()) .unwrap_or_else(|| req.uri().path().to_string()); - let method = req.method().clone(); - let method_str = method.as_str(); let start_time = Instant::now(); @@ -642,11 +640,6 @@ impl ProxyService { }; let actor_id = target.actor_id; - let server_id = target.server_id; - - // Convert UUIDs to strings for metrics, handling Optional fields - let actor_id_str = actor_id.map_or_else(|| "none".to_string(), |id| id.to_string()); - let server_id_str = server_id.map_or_else(|| "none".to_string(), |id| id.to_string()); // Extract IP address from remote_addr let client_ip = self.remote_addr.ip(); @@ -666,13 +659,8 @@ impl ProxyService { .map_err(Into::into) } else { // Increment metrics - metrics::PROXY_REQUEST_PENDING - .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) - .inc(); - - metrics::PROXY_REQUEST_TOTAL - .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) - .inc(); + metrics::PROXY_REQUEST_PENDING.inc(); + metrics::PROXY_REQUEST_TOTAL.inc(); // Prepare to release in-flight counter when done let state_clone = self.state.clone(); @@ -684,29 +672,35 @@ impl ProxyService { // Branch for WebSocket vs HTTP handling // Both paths will handle their own metrics and error handling - if hyper_tungstenite::is_upgrade_request(&req) { + let res = if hyper_tungstenite::is_upgrade_request(&req) { // WebSocket upgrade self.handle_websocket_upgrade(req, target).await } else { // Regular HTTP request self.handle_http_request(req, target).await - } + }; + + // Record metrics + let duration_secs = start_time.elapsed().as_secs_f64(); + metrics::PROXY_REQUEST_DURATION + .with_label_values(&[&status]) + .observe(duration_secs); + + metrics::PROXY_REQUEST_PENDING.dec(); + + res }; let status = match &res { Ok(resp) => resp.status().as_u16().to_string(), - Err(_) => "error".to_string(), - }; - - // Record metrics - let duration = start_time.elapsed(); - metrics::PROXY_REQUEST_DURATION - .with_label_values(&[&actor_id_str, &server_id_str, &status]) - .observe(duration.as_secs_f64()); + Err(err) => { + metrics::PROXY_REQUEST_ERROR + .with_label_values(&[&err.to_string()]) + .inc(); - metrics::PROXY_REQUEST_PENDING - .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) - .dec(); + "error".to_string() + } + }; res }