diff --git a/packages/common/service-discovery/Cargo.toml b/packages/common/service-discovery/Cargo.toml index 4dd5c1fcff..89fc64d9ea 100644 --- a/packages/common/service-discovery/Cargo.toml +++ b/packages/common/service-discovery/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] rand = "0.8" reqwest = { version = "0.12", features = ["json"] } +rivet-api.workspace = true serde = { version = "1.0", features = ["derive"] } tokio.workspace = true tracing = "0.1" diff --git a/packages/common/service-discovery/src/lib.rs b/packages/common/service-discovery/src/lib.rs index 85288bb38a..a63126c84c 100644 --- a/packages/common/service-discovery/src/lib.rs +++ b/packages/common/service-discovery/src/lib.rs @@ -1,17 +1,17 @@ -use std::{future::Future, net::Ipv4Addr, sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use rand::Rng; use reqwest::Client; -use serde::Deserialize; use tokio::{ sync::{Mutex, RwLock}, task::JoinHandle, }; use url::Url; +use rivet_api::models::{ProvisionServer, ProvisionDatacentersGetServersResponse}; pub struct ServiceDiscovery { fetch_endpoint: Url, - last: RwLock>, + last: RwLock>, handle: Mutex>>, } @@ -27,7 +27,7 @@ impl ServiceDiscovery { /// Starts a background tokio task that periodically fetches the endpoint and calls `cb`. pub fn start(self: &Arc, cb: F) where - F: Fn(Vec) -> Fut + Send + Sync + 'static, + F: Fn(Vec) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, E: std::fmt::Debug, { @@ -64,23 +64,23 @@ impl ServiceDiscovery { } /// Returns the last retrieved value without fetching. - pub async fn get(&self) -> Vec { + pub async fn get(&self) -> Vec { self.last.read().await.clone() } /// Manually fetches the endpoint. - pub async fn fetch(&self) -> Result, reqwest::Error> { + pub async fn fetch(&self) -> Result, reqwest::Error> { let client = Client::new(); Ok(self.fetch_inner(&client).await?.servers) } - async fn fetch_inner(&self, client: &Client) -> Result { + async fn fetch_inner(&self, client: &Client) -> Result { Ok(client .get(self.fetch_endpoint.clone()) .send() .await? .error_for_status()? - .json::() + .json::() .await?) } } @@ -93,13 +93,3 @@ impl Drop for ServiceDiscovery { } } } - -#[derive(Deserialize)] -pub struct ApiResponse { - pub servers: Vec, -} - -#[derive(Deserialize, Clone)] -pub struct ApiServer { - pub lan_ip: Option, -} diff --git a/packages/core/api/actor/src/route/builds.rs b/packages/core/api/actor/src/route/builds.rs index f31a578a56..7b95d8c6f6 100644 --- a/packages/core/api/actor/src/route/builds.rs +++ b/packages/core/api/actor/src/route/builds.rs @@ -536,11 +536,10 @@ pub async fn complete_build( } } - // TODO: Disabled until deploy - // // Error only if all prewarm requests failed - // if !results.is_empty() && results.iter().all(|res| res.is_err()) { - // return Err(unwrap!(unwrap!(results.into_iter().next()).err())); - // } + // Error only if all prewarm requests failed + if !results.is_empty() && results.iter().all(|res| res.is_err()) { + return Err(unwrap!(unwrap!(results.into_iter().next()).err())); + } } Ok(json!({})) diff --git a/packages/core/api/provision/src/route/datacenters.rs b/packages/core/api/provision/src/route/datacenters.rs index 2c467805fc..d5fac34aa4 100644 --- a/packages/core/api/provision/src/route/datacenters.rs +++ b/packages/core/api/provision/src/route/datacenters.rs @@ -56,18 +56,14 @@ pub async fn servers( _watch_index: WatchIndexQuery, query: ServerFilterQuery, ) -> GlobalResult { - // Find server based on public ip let servers_res = ctx - .op(cluster::ops::server::list::Input { - filter: cluster::types::Filter { - datacenter_ids: Some(vec![datacenter_id]), - pool_types: (!query.pools.is_empty()) - .then(|| query.pools.into_iter().map(ApiInto::api_into).collect()), - ..Default::default() - }, - include_destroyed: false, - exclude_draining: true, - exclude_no_vlan: true, + .op(cluster::ops::datacenter::server_discovery::Input { + datacenter_id, + pool_types: query + .pools + .into_iter() + .map(ApiInto::api_into) + .collect(), }) .await?; @@ -75,8 +71,6 @@ pub async fn servers( servers: servers_res .servers .into_iter() - // Filter out installing servers - .filter(|server| server.install_complete_ts.is_some()) .map(ApiInto::api_into) .collect(), }) diff --git a/packages/core/api/traefik-provider/src/route/tunnel.rs b/packages/core/api/traefik-provider/src/route/tunnel.rs index d7f57c0a4f..8927864102 100644 --- a/packages/core/api/traefik-provider/src/route/tunnel.rs +++ b/packages/core/api/traefik-provider/src/route/tunnel.rs @@ -42,24 +42,42 @@ pub async fn build_ip_allowlist( ctx: &Ctx, config: &mut types::TraefikConfigResponse, ) -> GlobalResult<()> { - let servers_res = ctx - .op(cluster::ops::server::list::Input { - filter: cluster::types::Filter { - pool_types: Some(vec![ - cluster::types::PoolType::Gg, - cluster::types::PoolType::Guard, - ]), - ..Default::default() - }, - include_destroyed: false, - exclude_draining: false, - exclude_no_vlan: false, + let servers = ctx + .cache() + .ttl(5000) + .fetch_one_json("cluster.guard_ip_allow_list", "", { + let ctx = (*ctx).clone(); + move |mut cache, key| { + let ctx = ctx.clone(); + async move { + let servers_res = ctx + .op(cluster::ops::server::list::Input { + filter: cluster::types::Filter { + pool_types: Some(vec![ + cluster::types::PoolType::Gg, + cluster::types::PoolType::Guard, + ]), + ..Default::default() + }, + include_destroyed: false, + // IMPORTANT: Returns installing servers + exclude_installing: false, + exclude_draining: true, + exclude_no_vlan: true, + }) + .await?; + + cache.resolve(&key, servers_res.servers); + + Ok(cache) + } + } }) .await?; - let public_ips = servers_res - .servers + let public_ips = servers .iter() + .flatten() .filter_map(|server| server.wan_ip) .map(|ip| ip.to_string()) .collect::>(); diff --git a/packages/core/services/build/src/ops/resolve_for_tags.rs b/packages/core/services/build/src/ops/resolve_for_tags.rs index 2ea49b6121..81daf697a5 100644 --- a/packages/core/services/build/src/ops/resolve_for_tags.rs +++ b/packages/core/services/build/src/ops/resolve_for_tags.rs @@ -29,7 +29,7 @@ pub async fn build_resolve_for_tags(ctx: &OperationCtx, input: &Input) -> Global .ttl(util::duration::seconds(15)) .fetch_one_json( "build", - format!("{}:{}", input.env_id, tags_str.as_str()), + (input.env_id, tags_str.as_str()), { let ctx = ctx.clone(); let tags_str = tags_str.clone(); diff --git a/packages/core/services/cluster/src/ops/datacenter/mod.rs b/packages/core/services/cluster/src/ops/datacenter/mod.rs index 762c2c33fd..5a8a194479 100644 --- a/packages/core/services/cluster/src/ops/datacenter/mod.rs +++ b/packages/core/services/cluster/src/ops/datacenter/mod.rs @@ -1,3 +1,4 @@ +pub mod server_discovery; pub mod get; pub mod list; pub mod location_get; diff --git a/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs b/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs new file mode 100644 index 0000000000..0e70a1a81b --- /dev/null +++ b/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs @@ -0,0 +1,82 @@ +use std::{collections::HashMap, str::FromStr}; + +use chirp_workflow::prelude::*; + +use crate::types::{Filter, PoolType, Server}; + +#[derive(Debug)] +pub struct Input { + pub datacenter_id: Uuid, + pub pool_types: Vec, +} + +#[derive(Debug)] +pub struct Output { + pub servers: Vec, +} + +/// Wrapper around server::list with very short cache. +#[operation] +pub async fn cluster_datacenter_server_discovery(ctx: &OperationCtx, input: &Input) -> GlobalResult { + let cache_keys = if input.pool_types.is_empty() { + vec![(input.datacenter_id, "all".to_string())] + } else { + input + .pool_types + .iter() + .map(|pool| (input.datacenter_id, pool.to_string())) + .collect() + }; + + let servers = ctx + .cache() + .ttl(5000) + .fetch_all_json("cluster.datacenter.service_discovery", cache_keys, { + let ctx = ctx.clone(); + move |mut cache, keys| { + let ctx = ctx.clone(); + async move { + let pools = keys + .into_iter() + .filter(|(_, pool)| pool != "all") + .map(|(_, pool)| PoolType::from_str(&pool)) + .collect::>>()?; + + let servers_res = ctx + .op(crate::ops::server::list::Input { + filter: Filter { + datacenter_ids: Some(vec![input.datacenter_id]), + pool_types: (!pools.is_empty()).then(|| pools), + ..Default::default() + }, + include_destroyed: false, + exclude_installing: true, + exclude_draining: true, + exclude_no_vlan: true, + }) + .await?; + + let mut servers_by_pool_type = + HashMap::with_capacity(servers_res.servers.len()); + + for server in servers_res.servers { + servers_by_pool_type + .entry(server.pool_type) + .or_insert_with(Vec::new) + .push(server); + } + + for (pool_type, servers) in servers_by_pool_type { + cache.resolve(&(input.datacenter_id, pool_type.to_string()), servers); + } + + Ok(cache) + } + } + }) + .await?; + + Ok(Output { + servers: servers.into_iter().flatten().collect(), + }) +} diff --git a/packages/core/services/cluster/src/ops/server/destroy_with_filter.rs b/packages/core/services/cluster/src/ops/server/destroy_with_filter.rs index ed23eb84bc..e5088e5bb8 100644 --- a/packages/core/services/cluster/src/ops/server/destroy_with_filter.rs +++ b/packages/core/services/cluster/src/ops/server/destroy_with_filter.rs @@ -21,6 +21,7 @@ pub async fn cluster_server_destroy_with_filter( .op(crate::ops::server::list::Input { filter: input.filter.clone(), include_destroyed: false, + exclude_installing: false, exclude_draining: false, exclude_no_vlan: false, }) diff --git a/packages/core/services/cluster/src/ops/server/list.rs b/packages/core/services/cluster/src/ops/server/list.rs index b1103177f7..9cfdbe024a 100644 --- a/packages/core/services/cluster/src/ops/server/list.rs +++ b/packages/core/services/cluster/src/ops/server/list.rs @@ -9,6 +9,7 @@ use crate::types::{Filter, Server}; pub struct Input { pub filter: Filter, pub include_destroyed: bool, + pub exclude_installing: bool, pub exclude_draining: bool, pub exclude_no_vlan: bool, } @@ -47,15 +48,17 @@ pub async fn cluster_server_list(ctx: &OperationCtx, input: &Input) -> GlobalRes ON s.datacenter_id = d.datacenter_id WHERE ($1 OR s.cloud_destroy_ts IS NULL) AND - (NOT $2 OR s.drain_ts IS NULL) AND - (NOT $3 OR s.vlan_ip IS NOT NULL) AND - ($4 IS NULL OR s.server_id = ANY($4)) AND - ($5 IS NULL OR s.datacenter_id = ANY($5)) AND - ($6 IS NULL OR d.cluster_id = ANY($6)) AND - ($7 IS NULL OR s.pool_type = ANY($7)) AND - ($8 IS NULL OR s.public_ip = ANY($8)) + (NOT $2 OR s.install_complete_ts IS NOT NULL) AND + (NOT $3 OR s.drain_ts IS NULL) AND + (NOT $4 OR s.vlan_ip IS NOT NULL) AND + ($5 IS NULL OR s.server_id = ANY($5)) AND + ($6 IS NULL OR s.datacenter_id = ANY($6)) AND + ($7 IS NULL OR d.cluster_id = ANY($7)) AND + ($8 IS NULL OR s.pool_type = ANY($8)) AND + ($9 IS NULL OR s.public_ip = ANY($9)) ", input.include_destroyed, + input.exclude_installing, input.exclude_draining, input.exclude_no_vlan, &input.filter.server_ids, diff --git a/packages/core/services/cluster/src/ops/server/taint_with_filter.rs b/packages/core/services/cluster/src/ops/server/taint_with_filter.rs index 1ea55facfd..28e9e6922a 100644 --- a/packages/core/services/cluster/src/ops/server/taint_with_filter.rs +++ b/packages/core/services/cluster/src/ops/server/taint_with_filter.rs @@ -21,6 +21,7 @@ pub async fn cluster_server_taint_with_filter( .op(crate::ops::server::list::Input { filter: input.filter.clone(), include_destroyed: false, + exclude_installing: false, exclude_draining: false, exclude_no_vlan: false, }) diff --git a/packages/core/services/cluster/src/types.rs b/packages/core/services/cluster/src/types.rs index 457fc53543..659967d693 100644 --- a/packages/core/services/cluster/src/types.rs +++ b/packages/core/services/cluster/src/types.rs @@ -1,6 +1,7 @@ use std::{ convert::{TryFrom, TryInto}, net::{IpAddr, Ipv4Addr}, + str::FromStr, }; use chirp_workflow::prelude::*; @@ -107,6 +108,25 @@ impl std::fmt::Display for PoolType { } } +impl FromStr for PoolType { + type Err = GlobalError; + + fn from_str(s: &str) -> Result { + match s { + "job" => Ok(PoolType::Job), + "gg" => Ok(PoolType::Gg), + "ats" => Ok(PoolType::Ats), + "pegboard" => Ok(PoolType::Pegboard), + "pegboard-isolate" => Ok(PoolType::PegboardIsolate), + "fdb" => Ok(PoolType::Fdb), + "worker" => Ok(PoolType::Worker), + "nats" => Ok(PoolType::Nats), + "guard" => Ok(PoolType::Guard), + _ => bail!("Invalid PoolType string: {}", s), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Hash)] pub struct Hardware { pub provider_hardware: String, @@ -144,7 +164,7 @@ impl From for BuildDeliveryMet } } -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Server { pub server_id: Uuid, pub datacenter_id: Uuid, diff --git a/packages/edge/api/traefik-provider/Cargo.toml b/packages/edge/api/traefik-provider/Cargo.toml index 0909dd4f0c..cd56945414 100644 --- a/packages/edge/api/traefik-provider/Cargo.toml +++ b/packages/edge/api/traefik-provider/Cargo.toml @@ -43,7 +43,6 @@ util-job.workspace = true uuid = { version = "1", features = ["v4"] } api-core-traefik-provider.workspace = true -cluster.workspace = true pegboard.workspace = true [dependencies.sqlx] diff --git a/packages/edge/api/traefik-provider/src/route/game_guard/api.rs b/packages/edge/api/traefik-provider/src/route/game_guard/api.rs deleted file mode 100644 index dab9429533..0000000000 --- a/packages/edge/api/traefik-provider/src/route/game_guard/api.rs +++ /dev/null @@ -1,97 +0,0 @@ -use api_core_traefik_provider::types; -use api_helper::ctx::Ctx; -use cluster::types::{Filter, PoolType}; -use rivet_operation::prelude::*; - -use crate::auth::Auth; - -#[tracing::instrument(skip_all)] -pub async fn build_api( - ctx: &Ctx, - config: &mut types::TraefikConfigResponse, -) -> GlobalResult<()> { - let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; - - let (dc_res, servers_res) = tokio::try_join!( - ctx.op(cluster::ops::datacenter::get::Input { - datacenter_ids: vec![dc_id], - }), - ctx.op(cluster::ops::server::list::Input { - filter: Filter { - datacenter_ids: Some(vec![dc_id]), - pool_types: Some(vec![PoolType::Worker]), - ..Default::default() - }, - include_destroyed: false, - exclude_draining: true, - exclude_no_vlan: true, - }), - )?; - let dc = unwrap!(dc_res.datacenters.first()); - - let mut middlewares = vec![]; - let service_id = "api".to_string(); - - let port = ctx.config().server()?.rivet.api_public.port(); - config.http.services.insert( - service_id.clone(), - types::TraefikService { - load_balancer: types::TraefikLoadBalancer { - servers: servers_res - .servers - .iter() - .map(|server| { - Ok(types::TraefikServer { - url: Some(format!("http://{}:{port}", unwrap!(server.lan_ip),)), - address: None, - }) - }) - .collect::>()?, - sticky: None, - }, - }, - ); - - config.http.middlewares.insert( - "api-compress".to_owned(), - types::TraefikMiddlewareHttp::Compress {}, - ); - middlewares.push("api-compress".to_string()); - - config.http.middlewares.insert( - "api-in-flight".to_owned(), - types::TraefikMiddlewareHttp::InFlightReq { - // This number needs to be high to allow for parallel requests - amount: 64, - // TODO: Different strat? - source_criterion: types::InFlightReqSourceCriterion::IpStrategy(types::IpStrategy { - depth: 0, - exclude_ips: None, - }), - }, - ); - middlewares.push("api-in-flight".to_string()); - - let url = ctx.config().server()?.rivet.edge_api_url(&dc.name_id)?; - let host = unwrap!(url.host()); - let rule = format!("Host(`{host}`)"); - - let tls_domain = types::TraefikTlsDomain { - main: host.to_string(), - sans: Vec::new(), - }; - - config.http.routers.insert( - "api-secure".to_string(), - types::TraefikRouter { - entry_points: vec!["lb-443".to_string()], - rule: Some(rule), - priority: None, - service: service_id.to_string(), - middlewares, - tls: Some(types::TraefikTls::build(vec![tls_domain])), - }, - ); - - Ok(()) -} diff --git a/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs b/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs index 273c291723..7385a9ef03 100644 --- a/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs +++ b/packages/edge/api/traefik-provider/src/route/game_guard/mod.rs @@ -1,4 +1,3 @@ -use api::build_api; use api_core_traefik_provider::types; use api_helper::{anchor::WatchIndexQuery, ctx::Ctx}; use job::build_job; @@ -7,7 +6,6 @@ use serde::{Deserialize, Serialize}; use crate::auth::Auth; -pub mod api; pub mod job; #[derive(Debug, Serialize, Deserialize)] @@ -30,8 +28,6 @@ pub async fn config( build_job(&ctx, &mut config).await?; - build_api(&ctx, &mut config).await?; - tracing::debug!( http_services = ?config.http.services.len(), http_routers = config.http.routers.len(), diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 63ba9174ae..52eb7927d2 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -230,7 +230,7 @@ impl ImageDownloadHandler { metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes); // Update state to signify download completed successfully - let foo = sqlx::query(indoc!( + sqlx::query(indoc!( " UPDATE images_cache SET diff --git a/packages/edge/infra/client/manager/src/pull_addr_handler.rs b/packages/edge/infra/client/manager/src/pull_addr_handler.rs index 60cba4eb5e..7e8b1285f7 100644 --- a/packages/edge/infra/client/manager/src/pull_addr_handler.rs +++ b/packages/edge/infra/client/manager/src/pull_addr_handler.rs @@ -1,8 +1,8 @@ -use std::time::{Duration, Instant}; +use std::{net::Ipv4Addr, time::{Duration, Instant}}; use anyhow::*; use pegboard_config::{Addresses, Client}; -use service_discovery::ApiResponse; +use serde::Deserialize; use tokio::sync::RwLock; /// Duration between pulling addresses again. @@ -63,3 +63,13 @@ impl PullAddrHandler { } } } + +#[derive(Deserialize)] +struct ApiResponse { + servers: Vec, +} + +#[derive(Deserialize, Clone)] +struct ApiServer { + lan_ip: Option, +} diff --git a/packages/edge/infra/guard/server/Cargo.toml b/packages/edge/infra/guard/server/Cargo.toml index 776643ae69..bd0605916f 100644 --- a/packages/edge/infra/guard/server/Cargo.toml +++ b/packages/edge/infra/guard/server/Cargo.toml @@ -39,9 +39,11 @@ route = { path = "../../../../core/services/route" } rustls = { version = "0.23.25" } rustls-pemfile = "2.0.0" serde_json = "1.0" +service-discovery.workspace = true tokio.workspace = true tracing.workspace = true types-proto = { path = "../../../../common/types-proto/core", package = "types-proto" } +url = "2.4" uuid = { version = "1.3", features = ["v4"] } [dev-dependencies] diff --git a/packages/edge/infra/guard/server/src/routing/api.rs b/packages/edge/infra/guard/server/src/routing/api.rs index f6473baf90..c34655c952 100644 --- a/packages/edge/infra/guard/server/src/routing/api.rs +++ b/packages/edge/infra/guard/server/src/routing/api.rs @@ -1,13 +1,18 @@ +use std::borrow::Cow; + use chirp_workflow::prelude::*; -use cluster::types::{Filter, PoolType}; use global_error::GlobalResult; use rivet_guard_core::proxy_service::{ RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout, StructuredResponse, }; use rivet_guard_core::status::StatusCode; -use std::borrow::Cow; +use service_discovery::ServiceDiscovery; +use url::Url; use uuid::Uuid; +// TODO: Copied from cluster/src/workflows/server/install/install_scripts/components/rivet/mod.rs +const TUNNEL_API_EDGE_PORT: u16 = 5010; + /// Route requests to the API service #[tracing::instrument(skip_all)] pub async fn route_api_request( @@ -39,33 +44,23 @@ pub async fn route_api_request( }))); } - // Get API server from the cluster - let servers_res = ctx - .op(cluster::ops::server::list::Input { - filter: Filter { - datacenter_ids: Some(vec![dc_id]), - pool_types: Some(vec![PoolType::Worker]), - ..Default::default() - }, - include_destroyed: false, - exclude_draining: true, - exclude_no_vlan: true, - }) - .await?; - tracing::debug!(?servers_res, "servers"); + // NOTE: We use service discovery instead of server::list or datacenter::server_discovery because cache is not + // shared between edge-edge or edge-core. SD requests the core which has a single cache. + let url = Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}/provision/datacenters/{dc_id}/servers?pools=worker"))?; + let sd = ServiceDiscovery::new(url); + let servers = sd.fetch().await?; + + tracing::debug!(?servers, "api servers"); let port = ctx.config().server()?.rivet.api_public.port(); - let targets = servers_res - .servers + let targets = servers .iter() - // Only include servers that are installed - .filter(|server| server.install_complete_ts.is_some()) .map(|server| { // For each server, create a target Ok(RouteTarget { actor_id: None, server_id: Some(server.server_id), - host: unwrap!(server.lan_ip).to_string(), + host: unwrap_ref!(server.lan_ip).to_string(), port, path: path.to_owned(), })