Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/common/service-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true
[dependencies]
rand = "0.8"
reqwest = { version = "0.12", features = ["json"] }
rivet-api.workspace = true
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true
tracing = "0.1"
Expand Down
26 changes: 8 additions & 18 deletions packages/common/service-discovery/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::{future::Future, net::Ipv4Addr, sync::Arc, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};

use rand::Rng;
use reqwest::Client;
use serde::Deserialize;
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use url::Url;
use rivet_api::models::{ProvisionServer, ProvisionDatacentersGetServersResponse};

pub struct ServiceDiscovery {
fetch_endpoint: Url,
last: RwLock<Vec<ApiServer>>,
last: RwLock<Vec<ProvisionServer>>,
handle: Mutex<Option<JoinHandle<()>>>,
}

Expand All @@ -27,7 +27,7 @@ impl ServiceDiscovery {
/// Starts a background tokio task that periodically fetches the endpoint and calls `cb`.
pub fn start<F, Fut, E>(self: &Arc<Self>, cb: F)
where
F: Fn(Vec<ApiServer>) -> Fut + Send + Sync + 'static,
F: Fn(Vec<ProvisionServer>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
E: std::fmt::Debug,
{
Expand Down Expand Up @@ -64,23 +64,23 @@ impl ServiceDiscovery {
}

/// Returns the last retrieved value without fetching.
pub async fn get(&self) -> Vec<ApiServer> {
pub async fn get(&self) -> Vec<ProvisionServer> {
self.last.read().await.clone()
}

/// Manually fetches the endpoint.
pub async fn fetch(&self) -> Result<Vec<ApiServer>, reqwest::Error> {
pub async fn fetch(&self) -> Result<Vec<ProvisionServer>, reqwest::Error> {
let client = Client::new();
Ok(self.fetch_inner(&client).await?.servers)
Comment on lines 73 to 74
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Creating new Client for each fetch is inefficient - consider reusing the client from fetch_inner's parameter

}

async fn fetch_inner(&self, client: &Client) -> Result<ApiResponse, reqwest::Error> {
async fn fetch_inner(&self, client: &Client) -> Result<ProvisionDatacentersGetServersResponse, reqwest::Error> {
Ok(client
.get(self.fetch_endpoint.clone())
.send()
.await?
.error_for_status()?
.json::<ApiResponse>()
.json::<ProvisionDatacentersGetServersResponse>()
.await?)
}
}
Expand All @@ -93,13 +93,3 @@ impl Drop for ServiceDiscovery {
}
}
}

#[derive(Deserialize)]
pub struct ApiResponse {
pub servers: Vec<ApiServer>,
}

#[derive(Deserialize, Clone)]
pub struct ApiServer {
pub lan_ip: Option<Ipv4Addr>,
}
9 changes: 4 additions & 5 deletions packages/core/api/actor/src/route/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,10 @@ pub async fn complete_build(
}
}

// TODO: Disabled until deploy
// // Error only if all prewarm requests failed
// if !results.is_empty() && results.iter().all(|res| res.is_err()) {
// return Err(unwrap!(unwrap!(results.into_iter().next()).err()));
// }
// Error only if all prewarm requests failed
if !results.is_empty() && results.iter().all(|res| res.is_err()) {
return Err(unwrap!(unwrap!(results.into_iter().next()).err()));
}
}

Ok(json!({}))
Expand Down
20 changes: 7 additions & 13 deletions packages/core/api/provision/src/route/datacenters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,21 @@ pub async fn servers(
_watch_index: WatchIndexQuery,
query: ServerFilterQuery,
) -> GlobalResult<models::ProvisionDatacentersGetServersResponse> {
// Find server based on public ip
let servers_res = ctx
.op(cluster::ops::server::list::Input {
filter: cluster::types::Filter {
datacenter_ids: Some(vec![datacenter_id]),
pool_types: (!query.pools.is_empty())
.then(|| query.pools.into_iter().map(ApiInto::api_into).collect()),
..Default::default()
},
include_destroyed: false,
exclude_draining: true,
exclude_no_vlan: true,
.op(cluster::ops::datacenter::server_discovery::Input {
datacenter_id,
pool_types: query
.pools
.into_iter()
.map(ApiInto::api_into)
.collect(),
})
.await?;
Comment on lines 59 to 68
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Replacing server::list with server_discovery removes several critical filters (exclude_draining, exclude_no_vlan). Verify these are handled in server_discovery operation or document why they're no longer needed


Ok(models::ProvisionDatacentersGetServersResponse {
servers: servers_res
.servers
.into_iter()
// Filter out installing servers
.filter(|server| server.install_complete_ts.is_some())
.map(ApiInto::api_into)
.collect(),
Comment on lines 71 to 75
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Removal of .filter(|server| server.install_complete_ts.is_some()) could expose incomplete/installing servers to clients. Verify this is handled in server_discovery or restore the filter

})
Expand Down
46 changes: 32 additions & 14 deletions packages/core/api/traefik-provider/src/route/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,42 @@ pub async fn build_ip_allowlist(
ctx: &Ctx<Auth>,
config: &mut types::TraefikConfigResponse,
) -> GlobalResult<()> {
let servers_res = ctx
.op(cluster::ops::server::list::Input {
filter: cluster::types::Filter {
pool_types: Some(vec![
cluster::types::PoolType::Gg,
cluster::types::PoolType::Guard,
]),
..Default::default()
},
include_destroyed: false,
exclude_draining: false,
exclude_no_vlan: false,
let servers = ctx
.cache()
.ttl(5000)
.fetch_one_json("cluster.guard_ip_allow_list", "", {
let ctx = (*ctx).clone();
move |mut cache, key| {
let ctx = ctx.clone();
async move {
let servers_res = ctx
.op(cluster::ops::server::list::Input {
filter: cluster::types::Filter {
pool_types: Some(vec![
cluster::types::PoolType::Gg,
cluster::types::PoolType::Guard,
]),
..Default::default()
},
include_destroyed: false,
// IMPORTANT: Returns installing servers
exclude_installing: false,
exclude_draining: true,
exclude_no_vlan: true,
})
.await?;

cache.resolve(&key, servers_res.servers);

Ok(cache)
}
}
})
.await?;

let public_ips = servers_res
.servers
let public_ips = servers
.iter()
.flatten()
.filter_map(|server| server.wan_ip)
.map(|ip| ip.to_string())
.collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/build/src/ops/resolve_for_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn build_resolve_for_tags(ctx: &OperationCtx, input: &Input) -> Global
.ttl(util::duration::seconds(15))
.fetch_one_json(
"build",
format!("{}:{}", input.env_id, tags_str.as_str()),
(input.env_id, tags_str.as_str()),
{
let ctx = ctx.clone();
let tags_str = tags_str.clone();
Expand Down
1 change: 1 addition & 0 deletions packages/core/services/cluster/src/ops/datacenter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod server_discovery;
pub mod get;
pub mod list;
pub mod location_get;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::{collections::HashMap, str::FromStr};

use chirp_workflow::prelude::*;

use crate::types::{Filter, PoolType, Server};

#[derive(Debug)]
pub struct Input {
pub datacenter_id: Uuid,
pub pool_types: Vec<PoolType>,
}

#[derive(Debug)]
pub struct Output {
pub servers: Vec<Server>,
}

/// Wrapper around server::list with very short cache.
#[operation]
pub async fn cluster_datacenter_server_discovery(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
let cache_keys = if input.pool_types.is_empty() {
vec![(input.datacenter_id, "all".to_string())]
} else {
input
.pool_types
.iter()
.map(|pool| (input.datacenter_id, pool.to_string()))
.collect()
};
Comment on lines +21 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using a const for 'all' string to avoid magic strings


let servers = ctx
.cache()
.ttl(5000)
.fetch_all_json("cluster.datacenter.service_discovery", cache_keys, {
let ctx = ctx.clone();
move |mut cache, keys| {
let ctx = ctx.clone();
async move {
let pools = keys
.into_iter()
.filter(|(_, pool)| pool != "all")
.map(|(_, pool)| PoolType::from_str(&pool))
.collect::<GlobalResult<Vec<_>>>()?;

let servers_res = ctx
.op(crate::ops::server::list::Input {
filter: Filter {
datacenter_ids: Some(vec![input.datacenter_id]),
pool_types: (!pools.is_empty()).then(|| pools),
..Default::default()
},
include_destroyed: false,
exclude_installing: true,
exclude_draining: true,
exclude_no_vlan: true,
})
.await?;

let mut servers_by_pool_type =
HashMap::with_capacity(servers_res.servers.len());

for server in servers_res.servers {
servers_by_pool_type
.entry(server.pool_type)
.or_insert_with(Vec::new)
.push(server);
}

for (pool_type, servers) in servers_by_pool_type {
cache.resolve(&(input.datacenter_id, pool_type.to_string()), servers);
}

Ok(cache)
}
}
})
.await?;

Ok(Output {
servers: servers.into_iter().flatten().collect(),
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub async fn cluster_server_destroy_with_filter(
.op(crate::ops::server::list::Input {
filter: input.filter.clone(),
include_destroyed: false,
exclude_installing: false,
exclude_draining: false,
exclude_no_vlan: false,
})
Expand Down
17 changes: 10 additions & 7 deletions packages/core/services/cluster/src/ops/server/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types::{Filter, Server};
pub struct Input {
pub filter: Filter,
pub include_destroyed: bool,
pub exclude_installing: bool,
pub exclude_draining: bool,
pub exclude_no_vlan: bool,
}
Expand Down Expand Up @@ -47,15 +48,17 @@ pub async fn cluster_server_list(ctx: &OperationCtx, input: &Input) -> GlobalRes
ON s.datacenter_id = d.datacenter_id
WHERE
($1 OR s.cloud_destroy_ts IS NULL) AND
(NOT $2 OR s.drain_ts IS NULL) AND
(NOT $3 OR s.vlan_ip IS NOT NULL) AND
($4 IS NULL OR s.server_id = ANY($4)) AND
($5 IS NULL OR s.datacenter_id = ANY($5)) AND
($6 IS NULL OR d.cluster_id = ANY($6)) AND
($7 IS NULL OR s.pool_type = ANY($7)) AND
($8 IS NULL OR s.public_ip = ANY($8))
(NOT $2 OR s.install_complete_ts IS NOT NULL) AND
(NOT $3 OR s.drain_ts IS NULL) AND
(NOT $4 OR s.vlan_ip IS NOT NULL) AND
($5 IS NULL OR s.server_id = ANY($5)) AND
($6 IS NULL OR s.datacenter_id = ANY($6)) AND
($7 IS NULL OR d.cluster_id = ANY($7)) AND
($8 IS NULL OR s.pool_type = ANY($8)) AND
($9 IS NULL OR s.public_ip = ANY($9))
",
input.include_destroyed,
input.exclude_installing,
input.exclude_draining,
input.exclude_no_vlan,
&input.filter.server_ids,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub async fn cluster_server_taint_with_filter(
.op(crate::ops::server::list::Input {
filter: input.filter.clone(),
include_destroyed: false,
exclude_installing: false,
exclude_draining: false,
exclude_no_vlan: false,
})
Comment on lines 21 to 27
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: All filter flags are set to false, but there's no comment explaining why we want to include all server states. Consider adding a comment explaining this design decision.

Expand Down
22 changes: 21 additions & 1 deletion packages/core/services/cluster/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
convert::{TryFrom, TryInto},
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

use chirp_workflow::prelude::*;
Expand Down Expand Up @@ -107,6 +108,25 @@ impl std::fmt::Display for PoolType {
}
}

impl FromStr for PoolType {
type Err = GlobalError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"job" => Ok(PoolType::Job),
"gg" => Ok(PoolType::Gg),
"ats" => Ok(PoolType::Ats),
"pegboard" => Ok(PoolType::Pegboard),
"pegboard-isolate" => Ok(PoolType::PegboardIsolate),
"fdb" => Ok(PoolType::Fdb),
"worker" => Ok(PoolType::Worker),
"nats" => Ok(PoolType::Nats),
"guard" => Ok(PoolType::Guard),
_ => bail!("Invalid PoolType string: {}", s),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Include variant name in error message for better debugging: Invalid PoolType string '{}' (expected: job, gg, ats, etc)

}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct Hardware {
pub provider_hardware: String,
Expand Down Expand Up @@ -144,7 +164,7 @@ impl From<rivet_config::config::rivet::BuildDeliveryMethod> for BuildDeliveryMet
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Server {
pub server_id: Uuid,
pub datacenter_id: Uuid,
Expand Down
1 change: 0 additions & 1 deletion packages/edge/api/traefik-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ util-job.workspace = true
uuid = { version = "1", features = ["v4"] }

api-core-traefik-provider.workspace = true
cluster.workspace = true
pegboard.workspace = true

[dependencies.sqlx]
Expand Down
Loading
Loading