diff --git a/packages/common/logs/src/lib.rs b/packages/common/logs/src/lib.rs index 809a9e3357..bba5a41e96 100644 --- a/packages/common/logs/src/lib.rs +++ b/packages/common/logs/src/lib.rs @@ -25,10 +25,12 @@ impl Logs { } impl Logs { - pub async fn start(self) -> Result> { + pub async fn start(mut self) -> Result> { // Create logs dir if it does not exist fs::create_dir_all(&self.path).await?; + self.rotate().await?; + Ok(tokio::spawn(self.run())) } @@ -112,10 +114,12 @@ impl Logs { } impl Logs { - pub fn start_sync(self) -> Result> { + pub fn start_sync(mut self) -> Result> { // Create logs dir if it does not exist std::fs::create_dir_all(&self.path)?; + self.rotate_sync()?; + Ok(std::thread::spawn(|| self.run_sync())) } diff --git a/packages/common/server-cli/src/commands/start.rs b/packages/common/server-cli/src/commands/start.rs index 72b160af53..d4b677b635 100644 --- a/packages/common/server-cli/src/commands/start.rs +++ b/packages/common/server-cli/src/commands/start.rs @@ -54,7 +54,6 @@ impl Opts { .and_then(|x| x.rivet.edge.as_ref()) .and_then(|x| x.redirect_logs_dir.as_ref()) { - std::fs::create_dir_all(logs_dir)?; rivet_logs::Logs::new(logs_dir.clone(), LOGS_RETENTION) .start() .await?; diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs index 3c4b5c19c1..e95e6e1617 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs @@ -229,6 +229,14 @@ pub async fn gen_initialize( )?); script.push(components::rivet::guard::fetch_tls(server_token)?); script.push(components::rivet::guard::configure(config)?); + + prometheus_targets.insert( + "guard".into(), + components::vector::PrometheusTarget { + endpoint: "http://127.0.0.1:8091".into(), + scrape_interval: 15, + }, + ); } } diff --git a/packages/edge/infra/guard/core/src/metrics.rs b/packages/edge/infra/guard/core/src/metrics.rs index 236c5f6a27..267bc8d9dc 100644 --- a/packages/edge/infra/guard/core/src/metrics.rs +++ b/packages/edge/infra/guard/core/src/metrics.rs @@ -1,29 +1,80 @@ use lazy_static::lazy_static; -use rivet_metrics::prometheus::*; +use rivet_metrics::{prometheus::*, REGISTRY}; lazy_static! { - pub static ref ACTOR_REQUEST_TOTAL: IntCounterVec = register_int_counter_vec!( - "actor_request_total", + // MARK: Internal + pub static ref ROUTE_CACHE_SIZE: IntGauge = register_int_gauge_with_registry!( + "guard_route_cache_size", + "Number of entries in the route cache", + *REGISTRY, + ).unwrap(); + pub static ref RATE_LIMITER_COUNT: IntGauge = register_int_gauge_with_registry!( + "guard_rate_limiter_count", + "Number of active rate limiters", + *REGISTRY, + ).unwrap(); + pub static ref IN_FLIGHT_COUNTER_COUNT: IntGauge = register_int_gauge_with_registry!( + "guard_in_flight_counter_count", + "Number of active in-flight counters", + *REGISTRY, + ) + .unwrap(); + + // MARK: TCP + pub static ref TCP_CONNECTION_TOTAL: IntCounter = register_int_counter_with_registry!( + "guard_tcp_connection_total", + "Total number of TCP connections ever", + *REGISTRY, + ) + .unwrap(); + pub static ref TCP_CONNECTION_PENDING: IntGauge = register_int_gauge_with_registry!( + "guard_tcp_connection_pending", + "Total number of open TCP connections", + *REGISTRY, + ) + .unwrap(); + pub static ref TCP_CONNECTION_DURATION: Histogram = register_histogram_with_registry!( + "guard_tcp_connection_duration", + "TCP connection duration in seconds", + *REGISTRY, + ) + .unwrap(); + + // MARK: Pre-proxy + pub static ref RESOLVE_ROUTE_DURATION: Histogram = register_histogram_with_registry!( + "guard_resolve_route_duration", + "Time to resolve request route in seconds", + *REGISTRY, + ) + .unwrap(); + + // MARK: Proxy requests + pub static ref PROXY_REQUEST_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "guard_proxy_request_total", "Total number of requests to actor", - &["actor_id", "server_id", "method", "path"] + &["actor_id", "server_id", "method", "path"], + *REGISTRY, ) .unwrap(); - pub static ref ACTOR_REQUEST_PENDING: IntGaugeVec = register_int_gauge_vec!( - "actor_request_pending", + pub static ref PROXY_REQUEST_PENDING: IntGaugeVec = register_int_gauge_vec_with_registry!( + "guard_proxy_request_pending", "Number of pending requests to actor", - &["actor_id", "server_id", "method", "path"] + &["actor_id", "server_id", "method", "path"], + *REGISTRY, ) .unwrap(); - pub static ref ACTOR_REQUEST_DURATION: HistogramVec = register_histogram_vec!( - "actor_request_duration_seconds", + pub static ref PROXY_REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!( + "guard_proxy_request_duration", "Request duration in seconds", - &["actor_id", "server_id", "status"] + &["actor_id", "server_id", "status"], + *REGISTRY, ) .unwrap(); - pub static ref ACTOR_REQUEST_ERRORS: IntCounterVec = register_int_counter_vec!( - "actor_request_errors_total", + pub static ref PROXY_REQUEST_ERROR: IntCounterVec = register_int_counter_vec_with_registry!( + "guard_proxy_request_errors_total", "Total number of errors when proxying requests to actor", - &["actor_id", "server_id", "error_type"] + &["actor_id", "server_id", "error_type"], + *REGISTRY, ) .unwrap(); } diff --git a/packages/edge/infra/guard/core/src/proxy_service.rs b/packages/edge/infra/guard/core/src/proxy_service.rs index d709a317cd..d599ecc609 100644 --- a/packages/edge/infra/guard/core/src/proxy_service.rs +++ b/packages/edge/infra/guard/core/src/proxy_service.rs @@ -175,6 +175,8 @@ impl RouteCache { #[tracing::instrument(skip_all)] async fn insert(&self, hostname: String, path: String, result: RouteConfig) { self.cache.upsert_async((hostname, path), result).await; + + metrics::ROUTE_CACHE_SIZE.set(self.cache.len() as i64); } #[tracing::instrument(skip_all)] @@ -182,6 +184,8 @@ impl RouteCache { self.cache .remove_async(&(hostname.to_owned(), path.to_owned())) .await; + + metrics::ROUTE_CACHE_SIZE.set(self.cache.len() as i64); } } @@ -253,8 +257,8 @@ pub struct ProxyState { routing_fn: RoutingFn, middleware_fn: MiddlewareFn, route_cache: RouteCache, - rate_limiters: SccHashMap, - in_flight_counters: SccHashMap, + rate_limiters: SccHashMap<(Uuid, std::net::IpAddr), RateLimiter>, + in_flight_counters: SccHashMap<(Uuid, std::net::IpAddr), InFlightCounter>, port_type: PortType, } @@ -447,79 +451,96 @@ impl ProxyState { } #[tracing::instrument(skip_all)] - async fn check_rate_limit(&self, actor_id: &Option) -> GlobalResult { - match actor_id { - Some(id) => { - let middleware_config = self.get_middleware_config(id).await?; - - let entry = self - .rate_limiters - .entry_async(*id) - .instrument(tracing::info_span!("entry_async")) - .await; - if let scc::hash_map::Entry::Occupied(mut entry) = entry { - // Key exists, get and mutate existing RateLimiter - let write_guard = entry.get_mut(); - Ok(write_guard.try_acquire()) - } else { - // Key doesn't exist, insert a new RateLimiter - let mut limiter = RateLimiter::new( - middleware_config.rate_limit.requests, - middleware_config.rate_limit.period, - ); - let result = limiter.try_acquire(); - entry.insert_entry(limiter); - Ok(result) - } - } - None => { - // No actor ID means no rate limiting - Ok(true) - } + async fn check_rate_limit( + &self, + ip_addr: std::net::IpAddr, + actor_id: &Option, + ) -> GlobalResult { + let Some(actor_id) = *actor_id else { + // No rate limiting when actor_id is None + return Ok(true); + }; + + // Get actor-specific middleware config + let middleware_config = self.get_middleware_config(&actor_id).await?; + + let cache_key = (actor_id, ip_addr); + let entry = self + .rate_limiters + .entry_async(cache_key) + .instrument(tracing::info_span!("entry_async")) + .await; + if let scc::hash_map::Entry::Occupied(mut entry) = entry { + // Key exists, get and mutate existing RateLimiter + let write_guard = entry.get_mut(); + Ok(write_guard.try_acquire()) + } else { + // Key doesn't exist, insert a new RateLimiter + let mut limiter = RateLimiter::new( + middleware_config.rate_limit.requests, + middleware_config.rate_limit.period, + ); + let result = limiter.try_acquire(); + entry.insert_entry(limiter); + + metrics::RATE_LIMITER_COUNT.set(self.rate_limiters.len() as i64); + + Ok(result) } } #[tracing::instrument(skip_all)] - async fn acquire_in_flight(&self, actor_id: &Option) -> GlobalResult { - match actor_id { - Some(id) => { - let middleware_config = self.get_middleware_config(id).await?; - - let entry = self - .in_flight_counters - .entry_async(*id) - .instrument(tracing::info_span!("entry_async")) - .await; - if let scc::hash_map::Entry::Occupied(mut entry) = entry { - // Key exists, get and mutate existing InFlightCounter - let write_guard = entry.get_mut(); - Ok(write_guard.try_acquire()) - } else { - // Key doesn't exist, insert a new InFlightCounter - let mut counter = InFlightCounter::new(middleware_config.max_in_flight.amount); - let result = counter.try_acquire(); - entry.insert_entry(counter); - Ok(result) - } - } - None => { - // No actor ID means no in-flight limiting - Ok(true) - } + async fn acquire_in_flight( + &self, + ip_addr: std::net::IpAddr, + actor_id: &Option, + ) -> GlobalResult { + let Some(actor_id) = *actor_id else { + // No in-flight limiting when actor_id is None + return Ok(true); + }; + + // Get actor-specific middleware config + let middleware_config = self.get_middleware_config(&actor_id).await?; + + let cache_key = (actor_id, ip_addr); + let entry = self + .in_flight_counters + .entry_async(cache_key) + .instrument(tracing::info_span!("entry_async")) + .await; + if let scc::hash_map::Entry::Occupied(mut entry) = entry { + // Key exists, get and mutate existing InFlightCounter + let write_guard = entry.get_mut(); + Ok(write_guard.try_acquire()) + } else { + // Key doesn't exist, insert a new InFlightCounter + let mut counter = InFlightCounter::new(middleware_config.max_in_flight.amount); + let result = counter.try_acquire(); + entry.insert_entry(counter); + + metrics::IN_FLIGHT_COUNTER_COUNT.set(self.in_flight_counters.len() as i64); + + Ok(result) } } #[tracing::instrument(skip_all)] - async fn release_in_flight(&self, actor_id: &Option) { - if let Some(id) = actor_id { - if let Some(mut counter) = self - .in_flight_counters - .get_async(id) - .instrument(tracing::info_span!("get_async")) - .await - { - counter.release(); - } + async fn release_in_flight(&self, ip_addr: std::net::IpAddr, actor_id: &Option) { + // Skip if actor_id is None (no in-flight tracking) + let actor_id = match actor_id { + Some(id) => *id, + None => return, // No in-flight tracking when actor_id is None + }; + + let cache_key = (actor_id, ip_addr); + if let Some(mut counter) = self + .in_flight_counters + .get_async(&cache_key) + .instrument(tracing::info_span!("get_async")) + .await + { + counter.release(); } } } @@ -580,13 +601,20 @@ impl ProxyService { .map(|x| x.to_string()) .unwrap_or_else(|| req.uri().path().to_string()); let method = req.method().clone(); + let method_str = method.as_str(); - // Resolve target - let target = match self + let start_time = Instant::now(); + + let target_res = self .state .resolve_route(host, &path, self.state.port_type.clone(), false) - .await - { + .await; + + let duration_secs = start_time.elapsed().as_secs_f64(); + metrics::RESOLVE_ROUTE_DURATION.observe(duration_secs); + + // Resolve target + let target = match target_res { Ok(ResolveRouteOutput::Target(target)) => target, Ok(ResolveRouteOutput::Response(response)) => { // Return the custom response @@ -607,61 +635,76 @@ impl ProxyService { let actor_id_str = actor_id.map_or_else(|| "none".to_string(), |id| id.to_string()); let server_id_str = server_id.map_or_else(|| "none".to_string(), |id| id.to_string()); - // Apply rate limiting - if !self.state.check_rate_limit(&actor_id).await? { - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[&actor_id_str, &server_id_str, "429"]) - .inc(); + // Extract IP address from remote_addr + let client_ip = self.remote_addr.ip(); - return Ok(Response::builder() + // Apply rate limiting + let res = if !self.state.check_rate_limit(client_ip, &actor_id).await? { + Response::builder() .status(StatusCode::TOO_MANY_REQUESTS) - .body(Full::::new(Bytes::new()))?); + .body(Full::::new(Bytes::new())) + .map_err(Into::into) } - // Check in-flight limit - if !self.state.acquire_in_flight(&actor_id).await? { - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[&actor_id_str, &server_id_str, "429"]) - .inc(); - - return Ok(Response::builder() + else if !self.state.acquire_in_flight(client_ip, &actor_id).await? { + Response::builder() .status(StatusCode::TOO_MANY_REQUESTS) - .body(Full::::new(Bytes::new()))?); - } - - // Let's save method before consuming req - let method_str = method.as_str(); - - // Increment metrics - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) - .inc(); + .body(Full::::new(Bytes::new())) + .map_err(Into::into) + } else { + // Increment metrics + metrics::PROXY_REQUEST_PENDING + .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) + .inc(); + + metrics::PROXY_REQUEST_TOTAL + .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) + .inc(); + + // Prepare to release in-flight counter when done + let state_clone = self.state.clone(); + crate::defer! { + tokio::spawn(async move { + state_clone.release_in_flight(client_ip, &actor_id).await; + }.instrument(tracing::info_span!("release_in_flight_task"))); + } + + // Branch for WebSocket vs HTTP handling + // Both paths will handle their own metrics and error handling + if hyper_tungstenite::is_upgrade_request(&req) { + // WebSocket upgrade + self.handle_websocket_upgrade(req, target).await + } else { + // Regular HTTP request + self.handle_http_request(req, target).await + } + }; - metrics::ACTOR_REQUEST_TOTAL - .with_label_values(&[&actor_id_str, &server_id_str, method_str, &path]) - .inc(); + let status = match &res { + Ok(resp) => resp.status().as_u16().to_string(), + Err(_) => "error".to_string(), + }; - // Create timer for duration metric - let start_time = Instant::now(); + // Record metrics + let duration = start_time.elapsed(); + metrics::PROXY_REQUEST_DURATION + .with_label_values(&[ + &actor_id_str, + &server_id_str, + &status, + ]) + .observe(duration.as_secs_f64()); - // Prepare to release in-flight counter when done - let state_clone = self.state.clone(); - let actor_id_clone = actor_id; - crate::defer! { - tokio::spawn(async move { - state_clone.release_in_flight(&actor_id_clone).await; - }.instrument(tracing::info_span!("release_in_flight_task"))); - } + metrics::PROXY_REQUEST_PENDING + .with_label_values(&[ + &actor_id_str, + &server_id_str, + method_str, + &path, + ]) + .dec(); - // Branch for WebSocket vs HTTP handling - // Both paths will handle their own metrics and error handling - if hyper_tungstenite::is_upgrade_request(&req) { - // WebSocket upgrade - self.handle_websocket_upgrade(req, target).await - } else { - // Regular HTTP request - self.handle_http_request(req, target, start_time).await - } + res } #[tracing::instrument(skip_all)] @@ -669,7 +712,6 @@ impl ProxyService { &self, req: Request, mut target: RouteTarget, - start_time: Instant, ) -> GlobalResult>> { // Get middleware config for this actor if it exists let middleware_config = match &target.actor_id { @@ -723,13 +765,6 @@ impl ProxyService { let initial_interval = middleware_config.retry.initial_interval; let timeout_duration = Duration::from_secs(middleware_config.timeout.request_timeout); - // Execute request with retry - let actor_id = target.actor_id; - let server_id = target.server_id; - // Get string representations for metrics - let actor_id_str = actor_id.map_or_else(|| "none".to_string(), |id| id.to_string()); - let server_id_str = server_id.map_or_else(|| "none".to_string(), |id| id.to_string()); - // Use a value-returning loop to handle both errors and successful responses let (status_code, last_error) = 'retry: { let mut attempts = 0; @@ -744,14 +779,6 @@ impl ProxyService { Err(e) => { error!("Failed to build HTTP request: {}", e); - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[ - &actor_id_str, - &server_id_str, - "request_build_error", - ]) - .inc(); - let error = Some(global_error::ext::AssertionError::Panic { message: format!("Failed to build HTTP request: {}", e), location: global_error::location!(), @@ -766,10 +793,6 @@ impl ProxyService { Err(e) => { error!("Failed to parse URI: {}", e); - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[&actor_id_str, &server_id_str, "uri_parse_error"]) - .inc(); - let error = Some(global_error::ext::AssertionError::Panic { message: format!("URI parse error: {}", e), location: global_error::location!(), @@ -783,6 +806,7 @@ impl ProxyService { Ok(req) => req, Err(e) => { warn!("Failed to build request body: {}", e); + let error = Some(global_error::ext::AssertionError::Panic { message: format!("Request build error: {}", e), location: global_error::location!(), @@ -794,25 +818,6 @@ impl ProxyService { // Send the request with timeout match timeout(timeout_duration, self.client.request(proxied_req)).await { Ok(Ok(resp)) => { - // Record metrics - let duration = start_time.elapsed(); - metrics::ACTOR_REQUEST_DURATION - .with_label_values(&[ - &actor_id_str, - &server_id_str, - &resp.status().as_u16().to_string(), - ]) - .observe(duration.as_secs_f64()); - - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[ - &actor_id_str, - &server_id_str, - req_parts.method.as_str(), - req_parts.uri.path(), - ]) - .dec(); - // Convert the hyper::body::Incoming to http_body_util::Full let (parts, body) = resp.into_parts(); @@ -874,10 +879,6 @@ impl ProxyService { timeout_duration.as_secs() ); - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[&actor_id_str, &server_id_str, "timeout"]) - .inc(); - let error = Some(global_error::ext::AssertionError::Panic { message: format!("Request timed out"), location: global_error::location!(), @@ -898,22 +899,6 @@ impl ProxyService { // Log the error error!("Request failed: {:?}", last_error); - // Only increment the error metric if not already done (for timeout) - if status_code == StatusCode::BAD_GATEWAY { - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[&actor_id_str, &server_id_str, "502"]) - .inc(); - } - - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[ - &actor_id_str, - &server_id_str, - req_parts.method.as_str(), - req_parts.uri.path(), - ]) - .dec(); - Ok(Response::builder() .status(status_code) .body(Full::::new(Bytes::new()))?) @@ -975,9 +960,6 @@ impl ProxyService { let actor_id_str = actor_id.map_or_else(|| "none".to_string(), |id| id.to_string()); let server_id_str = server_id.map_or_else(|| "none".to_string(), |id| id.to_string()); - // Start timing the request (metrics already incremented in handle_request) - let start_time = Instant::now(); - // Parsed for retries later let req_host = req .headers() @@ -1059,10 +1041,6 @@ impl ProxyService { // Clone needed values for the spawned task let state = self.state.clone(); - let actor_id_str_clone = actor_id_str.clone(); - let server_id_str_clone = server_id_str.clone(); - let path = target.path.clone(); - let method = "GET".to_string(); // WebSockets are always GET // Spawn a new task to handle the WebSocket bidirectional communication info!("Spawning task to handle WebSocket communication"); @@ -1090,15 +1068,6 @@ impl ProxyService { Ok(Err(e)) => { error!("Failed to get client WebSocket: {}", e); error!("Error details: {:?}", e); - // Decrement pending metric - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[ - &actor_id_str_clone, - &server_id_str_clone, - &method, - &path, - ]) - .dec(); return; } Err(_) => { @@ -1106,15 +1075,6 @@ impl ProxyService { "Timeout waiting for client WebSocket to be ready after {}s", timeout_duration.as_secs() ); - // Decrement pending metric - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[ - &actor_id_str_clone, - &server_id_str_clone, - &method, - &path, - ]) - .dec(); return; } }; @@ -1163,21 +1123,6 @@ impl ProxyService { if attempts >= max_attempts { error!("All {} WebSocket connection attempts failed", max_attempts); - // Increment error metric - metrics::ACTOR_REQUEST_ERRORS - .with_label_values(&[&actor_id_str_clone, &server_id_str_clone, "502"]) - .inc(); - - // Decrement pending metric - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[ - &actor_id_str_clone, - &server_id_str_clone, - &method, - &path, - ]) - .dec(); - // Send a close message to the client since we can't connect to upstream info!("Sending close message to client due to upstream connection failure"); let (mut client_sink, _) = client_ws.split(); @@ -1612,23 +1557,6 @@ impl ProxyService { info!("Starting bidirectional message forwarding"); tokio::join!(client_to_upstream, upstream_to_client); info!("Bidirectional message forwarding completed"); - - // Record duration when the WebSocket connection is closed - let duration = start_time.elapsed(); - info!("WebSocket connection duration: {:?}", duration); - metrics::ACTOR_REQUEST_DURATION - .with_label_values(&[ - &actor_id_str_clone, - &server_id_str_clone, - "101", // WebSocket connections always start with 101 status - ]) - .observe(duration.as_secs_f64()); - - // Decrement pending metric at the end - info!("Decrementing pending metric"); - metrics::ACTOR_REQUEST_PENDING - .with_label_values(&[&actor_id_str_clone, &server_id_str_clone, &method, &path]) - .dec(); } .instrument(tracing::info_span!("handle_ws_task")), ); @@ -1666,6 +1594,7 @@ impl ProxyService { .unwrap_or_else(|| req.uri().path().to_string()); let method = req.method().clone(); let request_id = Uuid::new_v4(); + let user_agent = req .headers() .get(hyper::header::USER_AGENT) @@ -1690,9 +1619,6 @@ impl ProxyService { // Process the request let result = self.handle_request(req).await; - // Log response information - let duration_ms = start_time.elapsed().as_millis(); - match &result { Ok(response) => { let status = response.status().as_u16(); @@ -1711,7 +1637,6 @@ impl ProxyService { host = %host, remote_addr = %self.remote_addr, status = %status, - duration_ms = %duration_ms, content_length = %content_length, "Request completed" ); @@ -1724,7 +1649,6 @@ impl ProxyService { path = %path, host = %host, remote_addr = %self.remote_addr, - duration_ms = %duration_ms, error = %e, "Request failed" ); diff --git a/packages/edge/infra/guard/core/src/server.rs b/packages/edge/infra/guard/core/src/server.rs index 0dffaecb9b..2a4414bf82 100644 --- a/packages/edge/infra/guard/core/src/server.rs +++ b/packages/edge/infra/guard/core/src/server.rs @@ -1,4 +1,5 @@ use crate::cert_resolver::{create_tls_config, CertResolverFn}; +use crate::metrics; use crate::proxy_service::{MiddlewareFn, ProxyServiceFactory, RoutingFn}; use global_error::*; use hyper::service::service_fn; @@ -6,7 +7,7 @@ use std::fmt; use std::net::SocketAddr; use std::pin::pin; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::signal::unix::signal; use tokio::signal::unix::SignalKind; use tokio_rustls::TlsAcceptor; @@ -105,6 +106,10 @@ pub async fn run_server( graceful: &hyper_util::server::graceful::GracefulShutdown, port_type_str: String, ) { + let connection_start = Instant::now(); + metrics::TCP_CONNECTION_PENDING.inc(); + metrics::TCP_CONNECTION_TOTAL.inc(); + let io = hyper_util::rt::TokioIo::new(tcp_stream); // Create a proxy service instance for this connection @@ -131,6 +136,10 @@ pub async fn run_server( error!("{} connection error: {}", port_type_str, err); } info!("{} connection dropped: {}", port_type_str, remote_addr); + + let connection_duration = connection_start.elapsed().as_secs_f64(); + metrics::TCP_CONNECTION_DURATION.observe(connection_duration); + metrics::TCP_CONNECTION_PENDING.dec(); } .instrument(tracing::info_span!(parent: None, "process_connection_task")), ); @@ -179,6 +188,10 @@ pub async fn run_server( // Accept TLS connection in a separate task to avoid ownership issues tokio::spawn(async move { + let connection_start = Instant::now(); + metrics::TCP_CONNECTION_PENDING.inc(); + metrics::TCP_CONNECTION_TOTAL.inc(); + match acceptor_clone .accept(tcp_stream) .instrument(tracing::info_span!("accept")) @@ -194,6 +207,7 @@ pub async fn run_server( // Using service_fn to convert our function into a hyper service let service = service_fn(move |req| { let service_clone = proxy_service.clone(); + async move { service_clone.process(req).await.map_err(|err| GlobalErrorWrapper{err}) } @@ -213,6 +227,10 @@ pub async fn run_server( error!("TLS handshake failed for {}: {}", remote_addr, e); } } + + let connection_duration = connection_start.elapsed().as_secs_f64(); + metrics::TCP_CONNECTION_DURATION.observe(connection_duration); + metrics::TCP_CONNECTION_PENDING.dec(); }.instrument(tracing::info_span!(parent: None, "process_tls_connection_task"))); } else { // Fallback to non-TLS handling (useful for testing) diff --git a/packages/edge/infra/guard/server/Cargo.toml b/packages/edge/infra/guard/server/Cargo.toml index 920d5267cb..917b4769df 100644 --- a/packages/edge/infra/guard/server/Cargo.toml +++ b/packages/edge/infra/guard/server/Cargo.toml @@ -25,6 +25,7 @@ rivet-config.workspace = true rivet-logs.workspace = true rivet-runtime.workspace = true rivet-pools.workspace = true +rivet-metrics.workspace = true futures = "0.3.30" global-error.workspace = true cluster.workspace = true diff --git a/packages/edge/infra/guard/server/src/main.rs b/packages/edge/infra/guard/server/src/main.rs index f3d8ef6355..9b6d61b8f3 100644 --- a/packages/edge/infra/guard/server/src/main.rs +++ b/packages/edge/infra/guard/server/src/main.rs @@ -46,7 +46,6 @@ async fn main_inner() -> GlobalResult<()> { .and_then(|x| x.rivet.edge.as_ref()) .and_then(|x| x.redirect_logs_dir.as_ref()) { - tokio::fs::create_dir_all(logs_dir).await?; unwrap!( rivet_logs::Logs::new(logs_dir.clone(), LOGS_RETENTION) .start() @@ -54,6 +53,9 @@ async fn main_inner() -> GlobalResult<()> { ); } + // Start metrics server + let metrics_task = tokio::spawn(rivet_metrics::run_standalone(config.clone())); + let pools = rivet_pools::Pools::new(config.clone()).await?; // Create context @@ -86,11 +88,14 @@ async fn main_inner() -> GlobalResult<()> { // Start the server tracing::info!("starting proxy server"); tokio::select! { - result = rivet_guard_core::run_server(config, routing_fn, middleware_fn, cert_resolver) => { - if let Err(e) = result { - tracing::error!("Server error: {}", e); + res = rivet_guard_core::run_server(config, routing_fn, middleware_fn, cert_resolver) => { + if let Err(err) = res { + tracing::error!(?err, "Server error"); } } + res = metrics_task => { + tracing::error!(?res, "Metrics task stopped"); + } _ = signal::ctrl_c() => { tracing::info!("received Ctrl+C, shutting down"); } diff --git a/packages/edge/infra/guard/server/src/tls.rs b/packages/edge/infra/guard/server/src/tls.rs index 331fad1b88..b8d5d48bbd 100644 --- a/packages/edge/infra/guard/server/src/tls.rs +++ b/packages/edge/infra/guard/server/src/tls.rs @@ -161,12 +161,10 @@ pub async fn create_cert_resolver( ); None } - Err(e) => { - bail!( - "Failed to build dynamic hostname actor routing regex: {}", - e - ); - } + Err(e) => bail!( + "Failed to build dynamic hostname actor routing regex: {}", + e + ), }; let actor_hostname_regex_static = match build_actor_hostname_and_path_regex(EndpointType::Path, guard_hostname) {