Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions packages/common/cache/build/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,20 @@ impl RequestConfig {
let payload = serde_json::to_vec(&message)?;

if let Err(err) = ups
.publish(CACHE_PURGE_TOPIC, &payload, universalpubsub::PublishOpts::broadcast())
.publish(
CACHE_PURGE_TOPIC,
&payload,
universalpubsub::PublishOpts::broadcast(),
)
.await
{
tracing::error!(?err, "failed to publish cache purge message");
} else {
tracing::debug!(base_key, keys_count = cache_keys.len(), "published cache purge message");
tracing::debug!(
base_key,
keys_count = cache_keys.len(),
"published cache purge message"
);
}
}

Expand Down
18 changes: 13 additions & 5 deletions packages/common/config/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl Default for Topology {
crate::defaults::ports::API_PEER
))
.unwrap(),
proxy_url: None,
valid_hosts: None,
}],
}
Expand All @@ -77,6 +78,9 @@ pub struct Datacenter {
pub public_url: Url,
/// URL of the api-peer service
pub peer_url: Url,
/// URL of the guard service that other datacenters can access privately. Goes to the same place as
// public_url.
pub proxy_url: Option<Url>,
/// List of hosts that are valid to connect to this region with. This is used in regional
/// endpoints to validate that incoming requests to this datacenter are going to a
/// region-specific domain.
Expand All @@ -98,14 +102,18 @@ impl Datacenter {
}
}

pub fn public_url_host(&self) -> Result<&str> {
self.public_url.host_str().context("no host")
pub fn proxy_url(&self) -> &Url {
self.proxy_url.as_ref().unwrap_or(&self.public_url)
}

pub fn public_url_port(&self) -> Result<u16> {
self.public_url
pub fn proxy_url_host(&self) -> Result<&str> {
self.proxy_url().host_str().context("no host")
}

pub fn proxy_url_port(&self) -> Result<u16> {
self.proxy_url()
.port()
.or_else(|| match self.public_url.scheme() {
.or_else(|| match self.proxy_url().scheme() {
"http" => Some(80),
"https" => Some(443),
_ => None,
Expand Down
1 change: 1 addition & 0 deletions packages/common/test-deps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl TestDeps {
is_leader: dc_id == dc_ids[0], // First DC in list is leader
public_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?,
peer_url: Url::parse(&format!("http://127.0.0.1:{api_peer_port}"))?,
proxy_url: None,
valid_hosts: None,
});
ports.push((api_peer_port, guard_port));
Expand Down
42 changes: 26 additions & 16 deletions packages/core/api-public/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use axum::{Json, extract::Extension, response::IntoResponse};
use anyhow::{bail, Result};

Check warning on line 1 in packages/core/api-public/src/health.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/health.rs
use axum::{extract::Extension, response::IntoResponse, Json};
use futures_util::StreamExt;
use rivet_api_builder::ApiError;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -87,7 +87,7 @@
}
} else {
// Remote datacenter - HTTP request
match send_health_check(&ctx, &dc).await {
match send_health_checks(&ctx, &dc).await {
Ok(response) => DatacenterHealth {
datacenter_label: dc.datacenter_label,
datacenter_name: dc.name.clone(),
Expand Down Expand Up @@ -128,30 +128,40 @@
}

#[tracing::instrument(skip_all)]
async fn send_health_check(
async fn send_health_checks(
ctx: &ApiCtx,
dc: &rivet_config::config::topology::Datacenter,
) -> Result<HealthResponse> {
let client = rivet_pools::reqwest::client().await?;
let url = dc.peer_url.join("/health")?;
let peer_url = dc.peer_url.join("/health")?;
let proxy_url = dc.proxy_url().join("/health")?;

tracing::debug!(
?dc.datacenter_label,
?url,
"sending health check to remote datacenter"
?peer_url,
?proxy_url,
"sending health checks to remote datacenter"
);

let res = client
.get(url)
.timeout(std::time::Duration::from_secs(5))
.send()
.await?;
let (peer_res, proxy_res) = tokio::try_join!(
client
.get(peer_url)
.timeout(std::time::Duration::from_secs(5))
.send(),
client
.get(proxy_url)
.timeout(std::time::Duration::from_secs(5))
.send()
)?;

if !peer_res.status().is_success() {
bail!("Peer health check returned status: {}", peer_res.status())
}

if res.status().is_success() {
let response = res.json::<HealthResponse>().await?;
if proxy_res.status().is_success() {
let response = proxy_res.json::<HealthResponse>().await?;
Ok(response)
} else {
anyhow::bail!("Health check returned status: {}", res.status())
bail!("Proxy health check returned status: {}", proxy_res.status())
}
}

48 changes: 30 additions & 18 deletions packages/core/guard/core/src/proxy_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Result, anyhow, bail};
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use http_body_util::{BodyExt, Full};
Expand Down Expand Up @@ -943,11 +943,16 @@ impl ProxyService {
}
Err(err) => {
if !err.is_connect() || attempts >= max_attempts {
tracing::error!(?err, "Request error after {} attempts", attempts);
return Err(errors::UpstreamError(
"Failed to connect to runner. Make sure your runners are healthy and do not have any crash logs."
.to_string(),
)
tracing::error!(
?err,
?target,
"Request error after {} attempts",
attempts
);

return Err(errors::UpstreamError(format!(
"Failed to connect to runner: {err}. Make sure your runners are healthy."
))
.build());
} else {
// Request connect error, might retry
Expand Down Expand Up @@ -1058,30 +1063,37 @@ impl ProxyService {
req_parts: &hyper::http::request::Parts,
target: &RouteTarget,
) -> Result<hyper::http::request::Builder> {
// Build the target URI using the url crate to properly handle IPv6 addresses
let mut url = Url::parse("http://example.com")?;
let scheme = if target.port == 443 { "https" } else { "http" };

// Wrap IPv6 addresses in brackets if not already wrapped
// Bracket raw IPv6 hosts
let host = if target.host.contains(':') && !target.host.starts_with('[') {
format!("[{}]", target.host)
} else {
target.host.clone()
};

url.set_host(Some(&host))
.map_err(|_| anyhow!("Failed to set host: {}", host))?;
url.set_port(Some(target.port))
.map_err(|_| anyhow!("Failed to set port"))?;
url.set_path(&target.path);
let uri = url.to_string();
// Ensure path starts with a leading slash
let path = if target.path.starts_with('/') {
target.path.clone()
} else {
format!("/{}", target.path)
};

let url = Url::parse(&format!("{scheme}://{host}:{}{}", target.port, path))
.context("invalid scheme/host/port when building URL")?;

// Build the proxied request
let mut builder = hyper::Request::builder()
.method(req_parts.method.clone())
.uri(&uri);
.uri(url.to_string());

// Add proxy headers
let headers = builder.headers_mut().unwrap();
add_proxy_headers_with_addr(headers, &req_parts.headers, self.remote_addr)?;
{
let headers = builder
.headers_mut()
.expect("request builder unexpectedly in error state");
add_proxy_headers_with_addr(headers, &req_parts.headers, self.remote_addr)?;
}

Ok(builder)
}
Expand Down
18 changes: 11 additions & 7 deletions packages/core/guard/server/src/cache/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ pub fn build_cache_key(target: &str, path: &str, headers: &hyper::HeaderMap) ->
ensure!(target == "actor", "wrong target");

// Find actor to route to
let actor_id_str = headers.get(X_RIVET_ACTOR).ok_or_else(|| {
crate::errors::MissingHeader {
header: X_RIVET_ACTOR.to_string(),
}
.build()
})?;
let actor_id = Id::parse(actor_id_str.to_str()?)?;
let actor_id_str = headers
.get(X_RIVET_ACTOR)
.ok_or_else(|| {
crate::errors::MissingHeader {
header: X_RIVET_ACTOR.to_string(),
}
.build()
})?
.to_str()
.context("invalid x-rivet-actor header")?;
let actor_id = Id::parse(actor_id_str).context("invalid x-rivet-actor header")?;

// Create a hash using target, actor_id, and path
let mut hasher = DefaultHasher::new();
Expand Down
14 changes: 8 additions & 6 deletions packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ pub async fn route_request(
// For HTTP, use the x-rivet-actor header
headers
.get(X_RIVET_ACTOR)
.and_then(|x| x.to_str().ok())
.map(|x| x.to_str())
.transpose()
.context("invalid x-rivet-actor header")?
.ok_or_else(|| {
crate::errors::MissingHeader {
header: X_RIVET_ACTOR.to_string(),
Expand All @@ -61,7 +63,7 @@ pub async fn route_request(
};

// Find actor to route to
let actor_id = Id::parse(actor_id_str)?;
let actor_id = Id::parse(actor_id_str).context("invalid x-rivet-actor header")?;

// Route to peer dc where the actor lives
if actor_id.label() != ctx.config().dc_label() {
Expand All @@ -76,12 +78,12 @@ pub async fn route_request(
targets: vec![RouteTarget {
actor_id: Some(actor_id),
host: peer_dc
.public_url_host()
.context("bad peer dc public url host")?
.proxy_url_host()
.context("bad peer dc proxy url host")?
.to_string(),
port: peer_dc
.public_url_port()
.context("bad peer dc public url port")?,
.proxy_url_port()
.context("bad peer dc proxy url port")?,
path: path.to_owned(),
}],
timeout: RoutingTimeout {
Expand Down
12 changes: 5 additions & 7 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-s
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id");

const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(10);
const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5);

struct OutboundConnection {
handle: JoinHandle<()>,
Expand Down Expand Up @@ -387,15 +387,14 @@ async fn outbound_handler(
_ = shutdown_rx => {}
}

// Stop runner
draining.store(true, Ordering::SeqCst);

ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
.send()
.await?;

if let Some(runner_id) = runner_id {
stop_runner(ctx, runner_id).await?;
drain_runner(ctx, runner_id).await?;
}

// Continue waiting on req while draining
Expand All @@ -421,7 +420,7 @@ async fn outbound_handler(
let runner_id_local =
Id::parse(&init.runner_id).context("invalid runner id")?;
runner_id = Some(runner_id_local);
stop_runner(ctx, runner_id_local).await?;
drain_runner(ctx, runner_id_local).await?;
}
}
}
Expand All @@ -440,7 +439,6 @@ async fn outbound_handler(
_ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => {
tracing::debug!("reached drain grace period before runner shut down")
}

}

// Close connection
Expand All @@ -456,7 +454,7 @@ async fn outbound_handler(
Ok(())
}

async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
let res = ctx
.signal(pegboard::workflows::runner::Forward {
inner: protocol::ToServer::ToServerStopping,
Expand Down Expand Up @@ -484,7 +482,7 @@ async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {

/// Send a stop message to the client.
///
/// This will close the runner's WebSocket..
/// This will close the runner's WebSocket.
async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
let receiver_subject =
pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string();
Expand Down
16 changes: 6 additions & 10 deletions packages/infra/engine/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
),
// Core services
Service::new(
"tracing_reconfigure",
ServiceKind::Core,
|config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)),
),
Service::new(
"cache_purge",
ServiceKind::Core,
|config, pools| Box::pin(rivet_cache_purge::start(config, pools)),
),
Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| {
Box::pin(rivet_tracing_reconfigure::start(config, pools))
}),
Service::new("cache_purge", ServiceKind::Core, |config, pools| {
Box::pin(rivet_cache_purge::start(config, pools))
}),
];

Ok(RunConfigData { services })
Expand Down
2 changes: 1 addition & 1 deletion packages/services/cache-purge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
while let Ok(NextOutput::Message(msg)) = sub.next().await {
match serde_json::from_slice::<CachePurgeMessage>(&msg.payload) {
Ok(purge_msg) => {
tracing::info!(
tracing::debug!(
base_key = ?purge_msg.base_key,
keys_count = purge_msg.keys.len(),
"received cache purge request"
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/runner/src/mod.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading