Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions out/errors/actor.no_runners_available.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions out/errors/guard.invalid_regional_host.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions out/errors/guard.must_use_regional_host.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 29 additions & 8 deletions packages/common/config/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,20 @@ impl Default for Topology {
Topology {
datacenter_label: 1,
datacenters: vec![Datacenter {
name: "local".into(),
name: "default".into(),
datacenter_label: 1,
is_leader: true,
api_peer_url: Url::parse(&format!(
public_url: Url::parse(&format!(
"http://127.0.0.1:{}",
crate::defaults::ports::API_PEER
crate::defaults::ports::GUARD
))
.unwrap(),
guard_url: Url::parse(&format!(
api_peer_url: Url::parse(&format!(
"http://127.0.0.1:{}",
crate::defaults::ports::GUARD
crate::defaults::ports::API_PEER
))
.unwrap(),
valid_hosts: None,
}],
}
}
Expand All @@ -72,8 +73,28 @@ pub struct Datacenter {
pub name: String,
pub datacenter_label: u16,
pub is_leader: bool,
/// Url of the api-peer service
/// Public origin that can be used to connect to this region.
pub public_url: Url,
/// URL of the api-peer service
pub api_peer_url: Url,
/// Url of the peer's guard server
pub guard_url: Url,
/// List of hosts that are valid to connect to this region with. This is used in regional
/// endpoints to validate that incoming requests to this datacenter are going to a
/// region-specific domain.
///
/// IMPORTANT: Do not use a global origin that routes to multiple different regions. This will
/// cause unpredictable behavior when requests are expected to go to a specific region.
#[serde(default)]
pub valid_hosts: Option<Vec<String>>,
}

impl Datacenter {
pub fn is_valid_regional_host(&self, host: &str) -> bool {
if let Some(valid_hosts) = &self.valid_hosts {
// Check if host is in the valid_hosts list
valid_hosts.iter().any(|valid_host| valid_host == host)
} else {
// Check if host matches the origin of public_url
self.public_url.host_str() == Some(host)
}
}
}
3 changes: 2 additions & 1 deletion packages/common/test-deps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ impl TestDeps {
name: format!("dc-{dc_id}"),
datacenter_label: dc_id,
is_leader: dc_id == dc_ids[0], // First DC in list is leader
public_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?,
api_peer_url: Url::parse(&format!("http://127.0.0.1:{api_peer_port}"))?,
guard_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?,
valid_hosts: None,
});
ports.push((api_peer_port, guard_port));
}
Expand Down
3 changes: 2 additions & 1 deletion packages/common/types/src/datacenters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use utoipa::ToSchema;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
pub struct Datacenter {
pub datacenter_label: u16,
pub label: u16,
pub name: String,
pub url: String,
}
24 changes: 15 additions & 9 deletions packages/core/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,21 @@ async fn create_inner(
) -> Result<CreateResponse> {
ctx.skip_auth();

// Determine which datacenter to create the actor in
let target_dc_label = if let Some(dc_name) = &query.datacenter {
ctx.config()
.dc_for_name(dc_name)
.ok_or_else(|| crate::errors::Datacenter::NotFound.build())?
.datacenter_label
} else {
ctx.config().dc_label()
};
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let target_dc_label = super::utils::find_dc_for_actor_creation(
&ctx,
namespace.namespace_id,
&query.namespace,
&body.runner_name_selector,
query.datacenter.as_ref().map(String::as_str),
)
.await?;

let query = rivet_api_types::actors::create::CreateQuery {
namespace: query.namespace,
Expand Down
18 changes: 8 additions & 10 deletions packages/core/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use utoipa::{IntoParams, ToSchema};

use crate::actors::utils;
use crate::ctx::ApiCtx;
use crate::errors;

#[derive(Debug, Deserialize, IntoParams)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -123,15 +122,14 @@ async fn get_or_create_inner(
}

// Actor doesn't exist for any key, create it
// Determine which datacenter to create the actor in
let target_dc_label = if let Some(dc_name) = &query.datacenter {
ctx.config()
.dc_for_name(dc_name)
.ok_or_else(|| errors::Datacenter::NotFound.build())?
.datacenter_label
} else {
ctx.config().dc_label()
};
let target_dc_label = super::utils::find_dc_for_actor_creation(
&ctx,
namespace.namespace_id,
&query.namespace,
&body.runner_name_selector,
query.datacenter.as_ref().map(String::as_str),
)
.await?;

let actor_id = Id::new_v1(target_dc_label);

Expand Down
36 changes: 36 additions & 0 deletions packages/core/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,39 @@ pub fn extract_duplicate_key_error(err: &anyhow::Error) -> Option<Id> {

None
}

/// Determine the datacenter label to create the actor in.
pub async fn find_dc_for_actor_creation(
ctx: &ApiCtx,
namespace_id: Id,
namespace_name: &str,
runner_name: &str,
dc_name: Option<&str>,
) -> Result<u16> {
let target_dc_label = if let Some(dc_name) = &dc_name {
// Use user-configured DC
ctx.config()
.dc_for_name(dc_name)
.ok_or_else(|| crate::errors::Datacenter::NotFound.build())?
.datacenter_label
} else {
// Find the nearest DC with runners
let res = ctx
.op(pegboard::ops::runner::find_dc_with_runner::Input {
namespace_id,
runner_name: runner_name.into(),
})
.await?;
if let Some(dc_label) = res.dc_label {
dc_label
} else {
return Err(pegboard::errors::Actor::NoRunnersAvailable {
namespace: namespace_name.into(),
runner_name: runner_name.into(),
}
.build());
}
};

Ok(target_dc_label)
}
3 changes: 2 additions & 1 deletion packages/core/api-public/src/datacenters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ async fn list_inner(ctx: ApiCtx) -> Result<ListResponse> {
.datacenters
.iter()
.map(|dc| Datacenter {
datacenter_label: dc.datacenter_label,
label: dc.datacenter_label,
name: dc.name.clone(),
url: dc.public_url.to_string(),
})
.collect(),
pagination: Pagination { cursor: None },
Expand Down
13 changes: 13 additions & 0 deletions packages/core/guard/server/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ pub struct ActorDestroyed {
pub struct ActorReadyTimeout {
pub actor_id: Id,
}

#[derive(RivetError, Serialize)]
#[error(
"guard",
"must_use_regional_host",
"Request must use a regional URL for this datacenter.",
"Invalid host {host} for datacenter {datacenter}. Please use one of the following hosts: {valid_hosts}"
)]
pub struct MustUseRegionalHost {
pub host: String,
pub datacenter: String,
pub valid_hosts: String,
}
8 changes: 4 additions & 4 deletions packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ pub async fn route_request(
targets: vec![RouteTarget {
actor_id: Some(actor_id),
host: peer_dc
.guard_url
.public_url
.host()
.context("peer dc guard_url has no host")?
.context("peer dc public_url has no host")?
.to_string(),
port: peer_dc
.guard_url
.public_url
.port()
.context("peer dc guard_url has no port")?,
.context("peer dc public_url has no port")?,
path: path.to_owned(),
}],
timeout: RoutingTimeout {
Expand Down
26 changes: 25 additions & 1 deletion packages/core/guard/server/src/routing/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,38 @@ pub(crate) const WS_PROTOCOL_TOKEN: &str = "rivet_token.";
pub async fn route_request(
ctx: &StandaloneCtx,
target: &str,
_host: &str,
host: &str,
_path: &str,
headers: &hyper::HeaderMap,
) -> Result<Option<RoutingOutput>> {
if target != "runner" {
return Ok(None);
}

// Validate that the host is valid for the current datacenter
let current_dc = ctx.config().topology().current_dc()?;
if !current_dc.is_valid_regional_host(host) {
tracing::warn!(?host, datacenter = ?current_dc.name, "invalid host for current datacenter");

// Determine valid hosts for error message
let valid_hosts = if let Some(hosts) = &current_dc.valid_hosts {
hosts.join(", ")
} else {
current_dc
.public_url
.host_str()
.map(|h| h.to_string())
.unwrap_or_else(|| "unknown".to_string())
};

return Err(crate::errors::MustUseRegionalHost {
host: host.to_string(),
datacenter: current_dc.name.clone(),
valid_hosts,
}
.build());
}

let is_websocket = headers
.get("upgrade")
.and_then(|v| v.to_str().ok())
Expand Down
7 changes: 7 additions & 0 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use universaldb::options::StreamingMode;
use universaldb::utils::IsolationLevel::*;
use vbare::OwnedVersionedData;

const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint");
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots");
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
Expand Down Expand Up @@ -266,6 +267,8 @@ async fn outbound_handler(
shutdown_rx: oneshot::Receiver<()>,
draining: Arc<AtomicBool>,
) -> Result<()> {
let current_dc = ctx.config().topology().current_dc()?;

let client = rivet_pools::reqwest::client_no_timeout().await?;
let headers = headers
.into_iter()
Expand All @@ -276,6 +279,10 @@ async fn outbound_handler(
v.parse::<HeaderValue>().ok()?,
))
})
.chain(std::iter::once((
X_RIVET_ENDPOINT,
HeaderValue::try_from(current_dc.public_url.to_string())?,
)))
.chain(std::iter::once((
X_RIVET_TOTAL_SLOTS,
HeaderValue::try_from(slots_per_runner)?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub async fn check_config_changes(
replica_id: dc.datacenter_label as u64,
status: status.into(),
api_peer_url: dc.api_peer_url.to_string(),
guard_url: dc.guard_url.to_string(),
guard_url: dc.public_url.to_string(),
}
})
.collect::<Vec<types::ReplicaConfig>>();
Expand Down Expand Up @@ -277,7 +277,7 @@ fn should_abort_reconfigure(
return Ok(true);
}

if url::Url::parse(&replica.guard_url)? != current_dc.guard_url {
if url::Url::parse(&replica.guard_url)? != current_dc.public_url {
tracing::info!(
"config changed during reconfigure (guard_url changed), aborting reconfigure"
);
Expand Down
10 changes: 10 additions & 0 deletions packages/services/pegboard/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ pub enum Actor {
"Actor key is already reserved in the datacenter '{datacenter_label}'. Either remove the datacenter constraint to automatically create this actor in the correct datacenter or provide the datacenter that matches."
)]
KeyReservedInDifferentDatacenter { datacenter_label: u16 },

#[error(
"no_runners_available",
"No runners are available in any datacenter. Validate the runner is listed in the Connect tab and that the runner's name matches the requested runner name.",
"No runners with name '{runner_name}' are available in any datacenter for the namespace '{namespace}'. Validate the runner is listed in the Connect tab and that the runner's name matches the requested runner name."
)]
NoRunnersAvailable {
namespace: String,
runner_name: String,
},
}

#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
Expand Down
Loading
Loading