From 8e6f70b57c4f3f0e4d781ba0b59747d8ec9e40d1 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:18:16 -0700 Subject: [PATCH] fix(guard): fix dc proxy reqs --- packages/common/cache/build/src/req_config.rs | 12 ++++- packages/common/config/src/config/topology.rs | 18 +++++-- packages/common/test-deps/src/lib.rs | 1 + packages/core/api-public/src/health.rs | 42 +++++++++------- packages/core/guard/core/src/proxy_service.rs | 48 ++++++++++++------- packages/core/guard/server/src/cache/actor.rs | 18 ++++--- .../server/src/routing/pegboard_gateway.rs | 14 +++--- packages/core/pegboard-serverless/src/lib.rs | 12 ++--- packages/infra/engine/src/run_config.rs | 16 +++---- packages/services/cache-purge/src/lib.rs | 2 +- sdks/typescript/runner/src/mod.ts | 2 +- 11 files changed, 112 insertions(+), 73 deletions(-) diff --git a/packages/common/cache/build/src/req_config.rs b/packages/common/cache/build/src/req_config.rs index 4383f7dfb0..b4282396c4 100644 --- a/packages/common/cache/build/src/req_config.rs +++ b/packages/common/cache/build/src/req_config.rs @@ -303,12 +303,20 @@ impl RequestConfig { let payload = serde_json::to_vec(&message)?; if let Err(err) = ups - .publish(CACHE_PURGE_TOPIC, &payload, universalpubsub::PublishOpts::broadcast()) + .publish( + CACHE_PURGE_TOPIC, + &payload, + universalpubsub::PublishOpts::broadcast(), + ) .await { tracing::error!(?err, "failed to publish cache purge message"); } else { - tracing::debug!(base_key, keys_count = cache_keys.len(), "published cache purge message"); + tracing::debug!( + base_key, + keys_count = cache_keys.len(), + "published cache purge message" + ); } } diff --git a/packages/common/config/src/config/topology.rs b/packages/common/config/src/config/topology.rs index cf1d9665c9..b96e3d7c6d 100644 --- a/packages/common/config/src/config/topology.rs +++ b/packages/common/config/src/config/topology.rs @@ -61,6 +61,7 @@ impl Default for Topology { crate::defaults::ports::API_PEER )) .unwrap(), + proxy_url: None, valid_hosts: None, }], } @@ -77,6 +78,9 @@ pub struct Datacenter { pub public_url: Url, /// URL of the api-peer service pub peer_url: Url, + /// URL of the guard service that other datacenters can access privately. Goes to the same place as + // public_url. + pub proxy_url: Option, /// List of hosts that are valid to connect to this region with. This is used in regional /// endpoints to validate that incoming requests to this datacenter are going to a /// region-specific domain. @@ -98,14 +102,18 @@ impl Datacenter { } } - pub fn public_url_host(&self) -> Result<&str> { - self.public_url.host_str().context("no host") + pub fn proxy_url(&self) -> &Url { + self.proxy_url.as_ref().unwrap_or(&self.public_url) } - pub fn public_url_port(&self) -> Result { - self.public_url + pub fn proxy_url_host(&self) -> Result<&str> { + self.proxy_url().host_str().context("no host") + } + + pub fn proxy_url_port(&self) -> Result { + self.proxy_url() .port() - .or_else(|| match self.public_url.scheme() { + .or_else(|| match self.proxy_url().scheme() { "http" => Some(80), "https" => Some(443), _ => None, diff --git a/packages/common/test-deps/src/lib.rs b/packages/common/test-deps/src/lib.rs index 381f53d27a..842ab2488a 100644 --- a/packages/common/test-deps/src/lib.rs +++ b/packages/common/test-deps/src/lib.rs @@ -50,6 +50,7 @@ impl TestDeps { is_leader: dc_id == dc_ids[0], // First DC in list is leader public_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?, peer_url: Url::parse(&format!("http://127.0.0.1:{api_peer_port}"))?, + proxy_url: None, valid_hosts: None, }); ports.push((api_peer_port, guard_port)); diff --git a/packages/core/api-public/src/health.rs b/packages/core/api-public/src/health.rs index ba97d98321..fb92e1c9ed 100644 --- a/packages/core/api-public/src/health.rs +++ b/packages/core/api-public/src/health.rs @@ -1,5 +1,5 @@ -use anyhow::Result; -use axum::{Json, extract::Extension, response::IntoResponse}; +use anyhow::{bail, Result}; +use axum::{extract::Extension, response::IntoResponse, Json}; use futures_util::StreamExt; use rivet_api_builder::ApiError; use serde::{Deserialize, Serialize}; @@ -87,7 +87,7 @@ async fn fanout_inner(ctx: ApiCtx) -> Result { } } else { // Remote datacenter - HTTP request - match send_health_check(&ctx, &dc).await { + match send_health_checks(&ctx, &dc).await { Ok(response) => DatacenterHealth { datacenter_label: dc.datacenter_label, datacenter_name: dc.name.clone(), @@ -128,30 +128,40 @@ async fn fanout_inner(ctx: ApiCtx) -> Result { } #[tracing::instrument(skip_all)] -async fn send_health_check( +async fn send_health_checks( ctx: &ApiCtx, dc: &rivet_config::config::topology::Datacenter, ) -> Result { let client = rivet_pools::reqwest::client().await?; - let url = dc.peer_url.join("/health")?; + let peer_url = dc.peer_url.join("/health")?; + let proxy_url = dc.proxy_url().join("/health")?; tracing::debug!( ?dc.datacenter_label, - ?url, - "sending health check to remote datacenter" + ?peer_url, + ?proxy_url, + "sending health checks to remote datacenter" ); - let res = client - .get(url) - .timeout(std::time::Duration::from_secs(5)) - .send() - .await?; + let (peer_res, proxy_res) = tokio::try_join!( + client + .get(peer_url) + .timeout(std::time::Duration::from_secs(5)) + .send(), + client + .get(proxy_url) + .timeout(std::time::Duration::from_secs(5)) + .send() + )?; + + if !peer_res.status().is_success() { + bail!("Peer health check returned status: {}", peer_res.status()) + } - if res.status().is_success() { - let response = res.json::().await?; + if proxy_res.status().is_success() { + let response = proxy_res.json::().await?; Ok(response) } else { - anyhow::bail!("Health check returned status: {}", res.status()) + bail!("Proxy health check returned status: {}", proxy_res.status()) } } - diff --git a/packages/core/guard/core/src/proxy_service.rs b/packages/core/guard/core/src/proxy_service.rs index b8530f0aa4..6e56eaafd8 100644 --- a/packages/core/guard/core/src/proxy_service.rs +++ b/packages/core/guard/core/src/proxy_service.rs @@ -1,4 +1,4 @@ -use anyhow::{Result, anyhow, bail}; +use anyhow::{Context, Result, bail}; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use http_body_util::{BodyExt, Full}; @@ -943,11 +943,16 @@ impl ProxyService { } Err(err) => { if !err.is_connect() || attempts >= max_attempts { - tracing::error!(?err, "Request error after {} attempts", attempts); - return Err(errors::UpstreamError( - "Failed to connect to runner. Make sure your runners are healthy and do not have any crash logs." - .to_string(), - ) + tracing::error!( + ?err, + ?target, + "Request error after {} attempts", + attempts + ); + + return Err(errors::UpstreamError(format!( + "Failed to connect to runner: {err}. Make sure your runners are healthy." + )) .build()); } else { // Request connect error, might retry @@ -1058,30 +1063,37 @@ impl ProxyService { req_parts: &hyper::http::request::Parts, target: &RouteTarget, ) -> Result { - // Build the target URI using the url crate to properly handle IPv6 addresses - let mut url = Url::parse("http://example.com")?; + let scheme = if target.port == 443 { "https" } else { "http" }; - // Wrap IPv6 addresses in brackets if not already wrapped + // Bracket raw IPv6 hosts let host = if target.host.contains(':') && !target.host.starts_with('[') { format!("[{}]", target.host) } else { target.host.clone() }; - url.set_host(Some(&host)) - .map_err(|_| anyhow!("Failed to set host: {}", host))?; - url.set_port(Some(target.port)) - .map_err(|_| anyhow!("Failed to set port"))?; - url.set_path(&target.path); - let uri = url.to_string(); + // Ensure path starts with a leading slash + let path = if target.path.starts_with('/') { + target.path.clone() + } else { + format!("/{}", target.path) + }; + + let url = Url::parse(&format!("{scheme}://{host}:{}{}", target.port, path)) + .context("invalid scheme/host/port when building URL")?; + // Build the proxied request let mut builder = hyper::Request::builder() .method(req_parts.method.clone()) - .uri(&uri); + .uri(url.to_string()); // Add proxy headers - let headers = builder.headers_mut().unwrap(); - add_proxy_headers_with_addr(headers, &req_parts.headers, self.remote_addr)?; + { + let headers = builder + .headers_mut() + .expect("request builder unexpectedly in error state"); + add_proxy_headers_with_addr(headers, &req_parts.headers, self.remote_addr)?; + } Ok(builder) } diff --git a/packages/core/guard/server/src/cache/actor.rs b/packages/core/guard/server/src/cache/actor.rs index 7b30025890..12f55a4509 100644 --- a/packages/core/guard/server/src/cache/actor.rs +++ b/packages/core/guard/server/src/cache/actor.rs @@ -14,13 +14,17 @@ pub fn build_cache_key(target: &str, path: &str, headers: &hyper::HeaderMap) -> ensure!(target == "actor", "wrong target"); // Find actor to route to - let actor_id_str = headers.get(X_RIVET_ACTOR).ok_or_else(|| { - crate::errors::MissingHeader { - header: X_RIVET_ACTOR.to_string(), - } - .build() - })?; - let actor_id = Id::parse(actor_id_str.to_str()?)?; + let actor_id_str = headers + .get(X_RIVET_ACTOR) + .ok_or_else(|| { + crate::errors::MissingHeader { + header: X_RIVET_ACTOR.to_string(), + } + .build() + })? + .to_str() + .context("invalid x-rivet-actor header")?; + let actor_id = Id::parse(actor_id_str).context("invalid x-rivet-actor header")?; // Create a hash using target, actor_id, and path let mut hasher = DefaultHasher::new(); diff --git a/packages/core/guard/server/src/routing/pegboard_gateway.rs b/packages/core/guard/server/src/routing/pegboard_gateway.rs index 16199c8d6b..30e34edb1a 100644 --- a/packages/core/guard/server/src/routing/pegboard_gateway.rs +++ b/packages/core/guard/server/src/routing/pegboard_gateway.rs @@ -51,7 +51,9 @@ pub async fn route_request( // For HTTP, use the x-rivet-actor header headers .get(X_RIVET_ACTOR) - .and_then(|x| x.to_str().ok()) + .map(|x| x.to_str()) + .transpose() + .context("invalid x-rivet-actor header")? .ok_or_else(|| { crate::errors::MissingHeader { header: X_RIVET_ACTOR.to_string(), @@ -61,7 +63,7 @@ pub async fn route_request( }; // Find actor to route to - let actor_id = Id::parse(actor_id_str)?; + let actor_id = Id::parse(actor_id_str).context("invalid x-rivet-actor header")?; // Route to peer dc where the actor lives if actor_id.label() != ctx.config().dc_label() { @@ -76,12 +78,12 @@ pub async fn route_request( targets: vec![RouteTarget { actor_id: Some(actor_id), host: peer_dc - .public_url_host() - .context("bad peer dc public url host")? + .proxy_url_host() + .context("bad peer dc proxy url host")? .to_string(), port: peer_dc - .public_url_port() - .context("bad peer dc public url port")?, + .proxy_url_port() + .context("bad peer dc proxy url port")?, path: path.to_owned(), }], timeout: RoutingTimeout { diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 6ba8a5cd5f..afeda30da2 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -28,7 +28,7 @@ const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-s const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id"); -const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(10); +const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); struct OutboundConnection { handle: JoinHandle<()>, @@ -387,7 +387,6 @@ async fn outbound_handler( _ = shutdown_rx => {} } - // Stop runner draining.store(true, Ordering::SeqCst); ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) @@ -395,7 +394,7 @@ async fn outbound_handler( .await?; if let Some(runner_id) = runner_id { - stop_runner(ctx, runner_id).await?; + drain_runner(ctx, runner_id).await?; } // Continue waiting on req while draining @@ -421,7 +420,7 @@ async fn outbound_handler( let runner_id_local = Id::parse(&init.runner_id).context("invalid runner id")?; runner_id = Some(runner_id_local); - stop_runner(ctx, runner_id_local).await?; + drain_runner(ctx, runner_id_local).await?; } } } @@ -440,7 +439,6 @@ async fn outbound_handler( _ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => { tracing::debug!("reached drain grace period before runner shut down") } - } // Close connection @@ -456,7 +454,7 @@ async fn outbound_handler( Ok(()) } -async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { +async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { let res = ctx .signal(pegboard::workflows::runner::Forward { inner: protocol::ToServer::ToServerStopping, @@ -484,7 +482,7 @@ async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { /// Send a stop message to the client. /// -/// This will close the runner's WebSocket.. +/// This will close the runner's WebSocket. async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { let receiver_subject = pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string(); diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index 32a561b3e1..07d23d8c48 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -24,16 +24,12 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { |config, pools| Box::pin(pegboard_serverless::start(config, pools)), ), // Core services - Service::new( - "tracing_reconfigure", - ServiceKind::Core, - |config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)), - ), - Service::new( - "cache_purge", - ServiceKind::Core, - |config, pools| Box::pin(rivet_cache_purge::start(config, pools)), - ), + Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| { + Box::pin(rivet_tracing_reconfigure::start(config, pools)) + }), + Service::new("cache_purge", ServiceKind::Core, |config, pools| { + Box::pin(rivet_cache_purge::start(config, pools)) + }), ]; Ok(RunConfigData { services }) diff --git a/packages/services/cache-purge/src/lib.rs b/packages/services/cache-purge/src/lib.rs index 0e59f8387d..3ae1a61df4 100644 --- a/packages/services/cache-purge/src/lib.rs +++ b/packages/services/cache-purge/src/lib.rs @@ -20,7 +20,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R while let Ok(NextOutput::Message(msg)) = sub.next().await { match serde_json::from_slice::(&msg.payload) { Ok(purge_msg) => { - tracing::info!( + tracing::debug!( base_key = ?purge_msg.base_key, keys_count = purge_msg.keys.len(), "received cache purge request" diff --git a/sdks/typescript/runner/src/mod.ts b/sdks/typescript/runner/src/mod.ts index dd58de7ab6..4aef7d5bd3 100644 --- a/sdks/typescript/runner/src/mod.ts +++ b/sdks/typescript/runner/src/mod.ts @@ -202,7 +202,7 @@ export class Runner { ): ActorInstance | undefined { const actor = this.#actors.get(actorId); if (!actor) { - logger()?.error({ msg: "actor not found", actorId }); + logger()?.error({ msg: "actor not found for removal", actorId }); return undefined; } if (generation !== undefined && actor.generation !== generation) {