diff --git a/lib/cache/build/src/req_config.rs b/lib/cache/build/src/req_config.rs index 4df909877a..30dcfb722b 100644 --- a/lib/cache/build/src/req_config.rs +++ b/lib/cache/build/src/req_config.rs @@ -157,8 +157,7 @@ impl RequestConfig { // // Drop `keys` bc this is not the same as the keys list in `ctx`, so it should not be used // again. - let mut ctx = GetterCtx::new(base_key.clone().into(), keys.to_vec()); - drop(keys); + let mut ctx = GetterCtx::new(base_key.clone().into(), keys); // Build keys to look up values in Redis let redis_keys = ctx diff --git a/proto/backend/ds.proto b/proto/backend/ds.proto index 90ac05c06f..602f27496b 100644 --- a/proto/backend/ds.proto +++ b/proto/backend/ds.proto @@ -6,6 +6,8 @@ import "proto/common.proto"; import "proto/backend/captcha.proto"; import "proto/backend/region.proto"; +// TODO: Delete file after converting ds-log pkg to native ops + message Server { reserved 8; diff --git a/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs b/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs index aaf906687f..59dfff182d 100644 --- a/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs +++ b/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs @@ -44,7 +44,7 @@ pub async fn build_ds( ) -> GlobalResult<()> { let dynamic_servers = ctx .cache() - .ttl(60) + .ttl(60_000) .fetch_one_json("servers_ports", dc_id, |mut cache, dc_id| async move { let rows = sql_fetch_all!( [ctx, DynamicServer] @@ -65,7 +65,7 @@ pub async fn build_ds( JOIN db_ds.docker_ports_protocol_game_guard AS gg ON ip.server_id = gg.server_id AND - ip.nomad_label = CONCAT('ds_', gg.port_name) + ip.nomad_label = CONCAT('ds_', REPLACE(gg.port_name, '-', '_')) WHERE s.datacenter_id = $1 AND s.destroy_ts IS NULL @@ -116,8 +116,6 @@ pub async fn build_ds( }, ); - tracing::info!(?config, "config timeeee"); - // TODO: add middleware & services & ports // TODO: same as jobs, watch out for namespaces Ok(()) diff --git a/svc/api/traefik-provider/src/route/game_guard/mod.rs b/svc/api/traefik-provider/src/route/game_guard/mod.rs index be26825858..a0696f8181 100644 --- a/svc/api/traefik-provider/src/route/game_guard/mod.rs +++ b/svc/api/traefik-provider/src/route/game_guard/mod.rs @@ -27,14 +27,10 @@ pub async fn config( let mut config = types::TraefikConfigResponse::default(); // Fetch configs and catch any errors - tracing::info!(?config, "traefik config ds"); - tracing::info!("asdgaerwvsdfvasdf"); build_ds(&ctx, datacenter, &mut config).await?; build_job(&ctx, datacenter, &mut config).await?; - tracing::info!(?config, "traefik config ds"); - // tracing::info!( // http_services = ?config.http.services.len(), // http_routers = config.http.routers.len(), diff --git a/svc/pkg/cluster/src/workflows/server/drain.rs b/svc/pkg/cluster/src/workflows/server/drain.rs index cdaa9100f4..73c623fefa 100644 --- a/svc/pkg/cluster/src/workflows/server/drain.rs +++ b/svc/pkg/cluster/src/workflows/server/drain.rs @@ -79,7 +79,7 @@ async fn drain_node(ctx: &ActivityCtx, input: &DrainNodeInput) -> GlobalResult GlobalResult GlobalResu .await?; if let Some(nomad_node_id) = nomad_node_id { - nodes_api::update_node_drain( + let res = nodes_api::update_node_drain( &NOMAD_CONFIG, &nomad_node_id, models::NodeUpdateDrainRequest { @@ -86,7 +86,19 @@ async fn undrain_node(ctx: &ActivityCtx, input: &UndrainNodeInput) -> GlobalResu None, None, ) - .await?; + .await; + + // Catch "node not found" error + if let Err(nomad_client::apis::Error::ResponseError( + nomad_client::apis::ResponseContent { content, .. }, + )) = res + { + if content == "node not found" { + tracing::warn!("node does not exist, not undraining"); + + return Ok(()); + } + } // Allow new matchmaker requests to the node running on this server msg!([ctx] mm::msg::nomad_node_closed_set(&nomad_node_id) { diff --git a/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs index 0a2aab6f43..8e719e76cc 100644 --- a/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs @@ -1,7 +1,7 @@ use std::time::Duration; use chirp_worker::prelude::*; -use rivet_operation::prelude::proto::backend::{self, pkg::*}; +use rivet_operation::prelude::proto::backend::pkg::*; use serde::Deserialize; use crate::util::NEW_NOMAD_CONFIG; @@ -31,7 +31,15 @@ struct RunData { nomad_node_name: String, nomad_node_public_ipv4: String, nomad_node_vlan_ipv4: String, - ports: Vec, + ports: Vec, +} + +#[derive(Clone, Debug)] +struct Port { + label: String, + source: u32, + target: u32, + ip: String, } #[worker(name = "ds-nomad-monitor-alloc-plan")] @@ -74,7 +82,7 @@ async fn worker( for port in dynamic_ports { // Don't share connect proxy ports let label = unwrap_ref!(port.label); - ports.push(backend::job::Port { + ports.push(Port { label: label.clone(), source: *unwrap_ref!(port.value) as u32, target: *unwrap_ref!(port.to) as u32, diff --git a/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs b/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs index 38c82ce382..40f2a576d5 100644 --- a/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs +++ b/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs @@ -27,12 +27,12 @@ async fn worker( ) -> GlobalResult<()> { let payload_value = serde_json::from_str::(&ctx.payload_json)?; - let job_id = unwrap!(unwrap!(payload_value.get("JobID"), "eval has no job id").as_str()); - let eval_status_raw = unwrap!(unwrap!(payload_value.get("Status")).as_str()); - // We can't decode this with serde, so manually deserialize the response let eval_value = unwrap!(payload_value.get("Evaluation")); + let job_id = unwrap!(unwrap!(eval_value.get("JobID"), "eval has no job id").as_str()); + let eval_status_raw = unwrap!(unwrap!(eval_value.get("Status")).as_str()); + if !util_job::is_nomad_job_run(job_id) { tracing::info!(%job_id, "disregarding event"); return Ok(()); diff --git a/svc/pkg/ds/tests/common.rs b/svc/pkg/ds/tests/common.rs index 5a99b5d52c..6f21742f91 100644 --- a/svc/pkg/ds/tests/common.rs +++ b/svc/pkg/ds/tests/common.rs @@ -1,265 +1,146 @@ -// use std::collections::HashMap; - -// use chirp_worker::prelude::*; -// use proto::backend::{self, pkg::*}; - -// pub struct Setup { -// pub namespace_id: Uuid, -// pub lobby_group_id_bridge: Uuid, -// pub lobby_group_id_host: Uuid, -// pub region_id: Uuid, -// pub region: backend::region::Region, -// pub host_port_http: u16, -// pub host_port_tcp: u16, -// pub host_port_udp: u16, -// } - -// impl Setup { -// pub async fn init(ctx: &TestCtx) -> Self { -// let region_res = op!([ctx] faker_region {}).await.unwrap(); -// let region_id = region_res.region_id.as_ref().unwrap().as_uuid(); - -// let game_res = op!([ctx] faker_game { -// skip_namespaces_and_versions: true, -// ..Default::default() -// }) -// .await -// .unwrap(); - -// let build_res = op!([ctx] faker_build { -// env_id: game_res.prod_env_id, -// image: backend::faker::Image::MmLobbyEcho as i32, -// }) -// .await -// .unwrap(); - -// // Pick host ports to connect to -// let host_port_http = rand::thread_rng().gen_range(26000..27000); -// let host_port_tcp = rand::thread_rng().gen_range(26000..27000); -// let host_port_udp = rand::thread_rng().gen_range(26000..27000); - -// let game_version_res = op!([ctx] faker_game_version { -// env_id: game_res.prod_env_id, -// override_lobby_groups: Some(faker::game_version::request::OverrideLobbyGroups { -// lobby_groups: vec![backend::matchmaker::LobbyGroup { -// name_id: "test-1".into(), - -// regions: vec![backend::matchmaker::lobby_group::Region { -// region_id: Some(region_id.into()), -// tier_name_id: util_ds::test::TIER_NAME_ID.to_owned(), -// idle_lobbies: Some(backend::matchmaker::lobby_group::IdleLobbies { -// min_idle_lobbies: 0, -// // Don't auto-destroy lobbies from tests -// max_idle_lobbies: 32, -// }), -// }], -// max_players_normal: 8, -// max_players_direct: 10, -// max_players_party: 12, -// listable: true, -// taggable: false, -// allow_dynamic_max_players: false, - -// runtime: Some(backend::matchmaker::lobby_runtime::Docker { -// build_id: build_res.build_id, -// args: Vec::new(), -// env_vars: vec![ -// backend::matchmaker::lobby_runtime::EnvVar { -// key: "HELLO".into(), -// value: "world".into(), -// }, -// ], -// network_mode: backend::matchmaker::lobby_runtime::NetworkMode::Bridge as i32, -// ports: vec![ -// backend::matchmaker::lobby_runtime::Port { -// label: "test-http".into(), -// target_port: Some(8001), -// port_range: None, -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Http as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, -// }, -// backend::matchmaker::lobby_runtime::Port { -// label: "test-tcp".into(), -// target_port: Some(8002), -// port_range: None, -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Tcp as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, -// }, -// backend::matchmaker::lobby_runtime::Port { -// label: "test-udp".into(), -// target_port: Some(8003), -// port_range: None, -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Udp as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, -// }, -// ], - -// }.into()), - -// actions: None, -// }, -// backend::matchmaker::LobbyGroup { -// name_id: "test-2".into(), - -// regions: vec![backend::matchmaker::lobby_group::Region { -// region_id: Some(region_id.into()), -// tier_name_id: util_ds::test::TIER_NAME_ID.to_owned(), -// idle_lobbies: Some(backend::matchmaker::lobby_group::IdleLobbies { -// min_idle_lobbies: 0, -// // See above -// max_idle_lobbies: 32, -// }), -// }], -// max_players_normal: 8, -// max_players_direct: 10, -// max_players_party: 12, -// listable: true, -// taggable: false, -// allow_dynamic_max_players: false, - -// runtime: Some(backend::matchmaker::lobby_runtime::Docker { -// build_id: build_res.build_id, -// args: Vec::new(), -// env_vars: vec![ -// backend::matchmaker::lobby_runtime::EnvVar { -// key: "HELLO".into(), -// value: "world".into(), -// }, -// backend::matchmaker::lobby_runtime::EnvVar { -// key: "HOST_PORT_HTTP".into(), -// value: host_port_http.to_string(), -// }, -// backend::matchmaker::lobby_runtime::EnvVar { -// key: "HOST_PORT_TCP".into(), -// value: host_port_tcp.to_string(), -// }, -// backend::matchmaker::lobby_runtime::EnvVar { -// key: "HOST_PORT_UDP".into(), -// value: host_port_udp.to_string(), -// }, -// ], -// network_mode: backend::matchmaker::lobby_runtime::NetworkMode::Host as i32, -// ports: vec![ -// // Game Guard -// backend::matchmaker::lobby_runtime::Port { -// label: "test-http".into(), -// target_port: Some(8001), -// port_range: None, -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Http as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, -// }, -// backend::matchmaker::lobby_runtime::Port { -// label: "test-tcp".into(), -// target_port: Some(8002), -// port_range: None, -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Tcp as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, -// }, -// backend::matchmaker::lobby_runtime::Port { -// label: "test-udp".into(), -// target_port: Some(8003), -// port_range: None, -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Udp as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, -// }, - -// // Host -// backend::matchmaker::lobby_runtime::Port { -// label: "test-range-tcp".into(), -// target_port: None, -// port_range: Some(backend::matchmaker::lobby_runtime::PortRange { -// min: 26000, -// max: 27000, -// }), -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Tcp as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::None as i32, -// }, -// backend::matchmaker::lobby_runtime::Port { -// label: "test-range-udp".into(), -// target_port: None, -// port_range: Some(backend::matchmaker::lobby_runtime::PortRange { -// min: 26000, -// max: 27000, -// }), -// proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Udp as i32, -// proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::None as i32, -// }, -// ], - -// }.into()), - -// actions: None, -// }], -// }), -// ..Default::default() -// }) -// .await -// .unwrap(); - -// let version_get_res = op!([ctx] mm_config_version_get { -// version_ids: vec![game_version_res.version_id.unwrap()], -// }) -// .await -// .unwrap(); -// let lobby_groups = &version_get_res -// .versions -// .first() -// .unwrap() -// .config_meta -// .as_ref() -// .unwrap() -// .lobby_groups; - -// let ns_create_res = op!([ctx] faker_game_namespace { -// env_id: game_res.prod_env_id, -// version_id: game_version_res.version_id, -// override_name_id: "prod".to_owned(), -// ..Default::default() -// }) -// .await -// .unwrap(); -// let namespace_id = ns_create_res.namespace_id.unwrap().as_uuid(); - -// Setup { -// namespace_id, -// lobby_group_id_bridge: lobby_groups[0].lobby_group_id.as_ref().unwrap().as_uuid(), -// lobby_group_id_host: lobby_groups[1].lobby_group_id.as_ref().unwrap().as_uuid(), -// region_id, -// region: region_res.region.clone().unwrap(), -// host_port_http, -// host_port_tcp, -// host_port_udp, -// } -// } - -// /// Create bridge lobby -// pub async fn create_lobby(&self, ctx: &TestCtx) -> Uuid { -// self.create_lobby_with_lgi(ctx, self.lobby_group_id_bridge) -// .await -// } - -// /// Create lobby with LGI -// pub async fn create_lobby_with_lgi(&self, ctx: &TestCtx, lgi: Uuid) -> Uuid { -// let lobby_id = Uuid::new_v4(); -// msg!([ctx] @notrace mm::msg::lobby_create(lobby_id) -> mm::msg::lobby_ready_complete(lobby_id) { -// lobby_id: Some(lobby_id.into()), -// namespace_id: Some(self.namespace_id.into()), -// lobby_group_id: Some(lgi.into()), -// region_id: Some(self.region_id.into()), -// create_ray_id: None, -// preemptively_created: false, - -// creator_user_id: None, -// is_custom: false, -// publicity: None, -// lobby_config_json: None, -// tags: HashMap::new(), -// dynamic_max_players: None, -// parameters: util::env::test_id_param(), -// }) -// .await -// .unwrap(); - -// lobby_id -// } -// } +use std::collections::HashMap; + +use chirp_workflow::prelude::*; +use ds::types; +use rivet_operation::prelude::proto::backend; +use serde_json::json; + +pub struct Setup { + pub env_id: Uuid, + pub cluster_id: Uuid, + pub datacenter_id: Uuid, + pub image_id: Uuid, +} + +impl Setup { + pub async fn init(ctx: &TestCtx) -> Self { + let region_res = op!([ctx] faker_region {}).await.unwrap(); + let region_id = region_res.region_id.as_ref().unwrap().as_uuid(); + + let game_res = op!([ctx] faker_game { + ..Default::default() + }) + .await + .unwrap(); + + let build_res = op!([ctx] faker_build { + env_id: game_res.prod_env_id, + image: backend::faker::Image::DsEcho as i32, + }) + .await + .unwrap(); + + // Pick an existing cluster + let cluster_id = ctx + .op(cluster::ops::list::Input {}) + .await + .unwrap() + .cluster_ids + .first() + .unwrap() + .to_owned(); + + Setup { + env_id: game_res.prod_env_id.unwrap().as_uuid(), + cluster_id, + datacenter_id: region_id, + image_id: build_res.build_id.unwrap().as_uuid(), + } + } + + /// Create bridge server + pub async fn create_bridge_server(&self, ctx: &TestCtx) -> Uuid { + self.create_server_inner(ctx, types::NetworkMode::Bridge) + .await + } + + /// Create host server + pub async fn create_host_server(&self, ctx: &TestCtx) -> Uuid { + self.create_server_inner(ctx, types::NetworkMode::Host) + .await + } + + async fn create_server_inner(&self, ctx: &TestCtx, network_mode: types::NetworkMode) -> Uuid { + let env = vec![ + ("HTTP_PORT".to_string(), 8001.to_string()), + ("TCP_PORT".to_string(), 8002.to_string()), + ("UDP_PORT".to_string(), 8003.to_string()), + ] + .into_iter() + .collect(); + + let ports = vec![ + ( + "test-http".to_string(), + ds::workflows::server::Port { + internal_port: Some(8001), + routing: types::Routing::GameGuard { + protocol: types::GameGuardProtocol::Http, + }, + }, + ), + ( + "test-tcp".to_string(), + ds::workflows::server::Port { + internal_port: Some(8002), + routing: types::Routing::GameGuard { + protocol: types::GameGuardProtocol::Tcp, + }, + }, + ), + ( + "test-udp".to_string(), + ds::workflows::server::Port { + internal_port: Some(8003), + routing: types::Routing::GameGuard { + protocol: types::GameGuardProtocol::Udp, + }, + }, + ), + ] + // Collect into hashmap + .into_iter() + .collect(); + + let server_id = Uuid::new_v4(); + + let mut sub = ctx + .subscribe::(&json!({ + "server_id": server_id, + })) + .await + .unwrap(); + + ctx.dispatch_tagged_workflow( + &json!({ + "server_id": server_id, + }), + ds::workflows::server::Input { + server_id, + env_id: self.env_id, + cluster_id: self.cluster_id, + datacenter_id: self.datacenter_id, + resources: ds::types::ServerResources { + cpu_millicores: 100, + memory_mib: 200, + }, + kill_timeout_ms: 0, + tags: HashMap::new(), + args: Vec::new(), + environment: env, + image_id: self.image_id, + network_mode, + network_ports: ports, + }, + ) + .await + .unwrap(); + + sub.next().await.unwrap(); + + // Sleep for 5 seconds + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + server_id + } +} diff --git a/svc/pkg/ds/tests/lobby_connectivity.rs b/svc/pkg/ds/tests/lobby_connectivity.rs index 1b5af4a995..a35bef97d4 100644 --- a/svc/pkg/ds/tests/lobby_connectivity.rs +++ b/svc/pkg/ds/tests/lobby_connectivity.rs @@ -1,310 +1,286 @@ -// mod common; - -// use std::{ -// io::{BufRead, BufReader, Write}, -// net::{TcpStream, UdpSocket}, -// }; - -// use chirp_worker::prelude::*; -// use common::*; -// use proto::backend; - -// #[worker_test] -// async fn lobby_connectivity_http_normal(ctx: TestCtx) { -// if !util::feature::job_run() { -// return; -// } - -// let setup = Setup::init(&ctx).await; - -// let lobby_id = setup.create_lobby(&ctx).await; - -// let (hostname, _) = get_lobby_addr(&ctx, lobby_id, "test-http").await; -// tracing::info!("testing http to {}", hostname); - -// // Echo body -// let random_body = Uuid::new_v4().to_string(); -// let client = reqwest::Client::new(); -// let res = client -// .post(format!("http://{hostname}")) -// .body(random_body.clone()) -// .send() -// .await -// .unwrap() -// .error_for_status() -// .unwrap(); -// let res_text = res.text().await.unwrap(); -// assert_eq!(random_body, res_text, "echoed wrong response"); -// } - -// // #[worker_test] -// // async fn lobby_connectivity_http_host(ctx: TestCtx) { -// // if !util::feature::job_run() { -// // return; -// // } - -// // let setup = Setup::init(&ctx).await; - -// // let lobby_id = setup -// // .create_lobby_with_lgi(&ctx, setup.lobby_group_id_host) -// // .await; - -// // // Echo body (bridge) -// // { -// // let (hostname, _) = get_lobby_addr(&ctx, lobby_id, "test-http").await; -// // tracing::info!("testing http to {}", hostname); - -// // let random_body = Uuid::new_v4().to_string(); -// // let client = reqwest::Client::new(); -// // let res = client -// // .post(format!("http://{hostname}")) -// // .body(random_body.clone()) -// // .send() -// // .await -// // .unwrap() -// // .error_for_status() -// // .unwrap(); -// // let res_text = res.text().await.unwrap(); -// // assert_eq!(random_body, res_text, "echoed wrong response"); -// // } - -// // // Echo body (host) -// // { -// // let host_ip = get_lobby_host_ip(&ctx, lobby_id).await; -// // tracing::info!("testing http to {}:{}", host_ip, setup.host_port_http); - -// // let random_body = Uuid::new_v4().to_string(); -// // let client = reqwest::Client::new(); -// // let res = client -// // .post(format!("http://{host_ip}:{}", setup.host_port_http)) -// // .body(random_body.clone()) -// // .send() -// // .await -// // .unwrap() -// // .error_for_status() -// // .unwrap(); -// // let res_text = res.text().await.unwrap(); -// // assert_eq!(random_body, res_text, "echoed wrong response"); -// // } -// // } - -// // #[worker_test] -// // async fn lobby_connectivity_tcp(ctx: TestCtx) { -// // if !util::feature::job_run() { -// // return; -// // } - -// // let setup = Setup::init(&ctx).await; - -// // let lobby_id = setup.create_lobby(&ctx).await; - -// // let (hostname, port) = get_lobby_addr(&ctx, lobby_id, "test-tcp").await; -// // tracing::info!("testing tcp to {}:{}", hostname, port); - -// // // Echo body -// // let random_body = Uuid::new_v4().to_string(); -// // let mut stream = TcpStream::connect((hostname, port)).unwrap(); - -// // stream.write_all(random_body.as_ref()).unwrap(); -// // stream.write_all(b"\n").unwrap(); -// // stream.flush().unwrap(); - -// // let mut reader = BufReader::new(&stream); -// // let mut response = String::new(); -// // reader.read_line(&mut response).expect("read line"); - -// // assert_eq!( -// // &random_body, -// // &response[..response.len() - 1], -// // "echoed wrong response" -// // ); -// // } - -// // #[worker_test] -// // async fn lobby_connectivity_tcp_host(ctx: TestCtx) { -// // if !util::feature::job_run() { -// // return; -// // } - -// // let setup = Setup::init(&ctx).await; - -// // let lobby_id = setup -// // .create_lobby_with_lgi(&ctx, setup.lobby_group_id_host) -// // .await; - -// // // Echo body (bridge) -// // { -// // let (hostname, port) = get_lobby_addr(&ctx, lobby_id, "test-tcp").await; -// // tracing::info!("testing tcp to {}:{}", hostname, port); - -// // // Echo body -// // let random_body = Uuid::new_v4().to_string(); -// // let mut stream = TcpStream::connect((hostname, port)).unwrap(); - -// // stream.write_all(random_body.as_ref()).unwrap(); -// // stream.write_all(b"\n").unwrap(); -// // stream.flush().unwrap(); - -// // let mut reader = BufReader::new(&stream); -// // let mut response = String::new(); -// // reader.read_line(&mut response).expect("read line"); - -// // assert_eq!( -// // &random_body, -// // &response[..response.len() - 1], -// // "echoed wrong response" -// // ); -// // } - -// // // Echo body (host) -// // { -// // let host_ip = get_lobby_host_ip(&ctx, lobby_id).await; -// // tracing::info!("testing tcp to {}:{}", host_ip, setup.host_port_tcp); - -// // let random_body = Uuid::new_v4().to_string(); -// // let mut stream = TcpStream::connect((host_ip, setup.host_port_tcp)).unwrap(); - -// // stream.write_all(random_body.as_ref()).unwrap(); -// // stream.write_all(b"\n").unwrap(); -// // stream.flush().unwrap(); - -// // let mut reader = BufReader::new(&stream); -// // let mut response = String::new(); -// // reader.read_line(&mut response).expect("read line"); - -// // assert_eq!( -// // &random_body, -// // &response[..response.len() - 1], -// // "echoed wrong response" -// // ); -// // } -// // } - -// // #[worker_test] -// // async fn lobby_connectivity_udp(ctx: TestCtx) { -// // if !util::feature::job_run() { -// // return; -// // } - -// // let setup = Setup::init(&ctx).await; - -// // let lobby_id = setup.create_lobby(&ctx).await; - -// // let (hostname, port) = get_lobby_addr(&ctx, lobby_id, "test-udp").await; -// // tracing::info!("testing udp to {}:{}", hostname, port); - -// // // Echo body -// // let random_body = Uuid::new_v4(); -// // let socket = UdpSocket::bind(("0.0.0.0", 0)).unwrap(); -// // socket.connect((hostname, port)).unwrap(); -// // socket.send(random_body.as_ref()).unwrap(); - -// // let mut response = [0; 2048]; -// // let recv_len = socket.recv(&mut response).unwrap(); - -// // assert_eq!( -// // random_body.as_bytes(), -// // &response[..recv_len], -// // "echoed wrong response" -// // ); -// // } - -// // #[worker_test] -// // async fn lobby_connectivity_udp_host(ctx: TestCtx) { -// // if !util::feature::job_run() { -// // return; -// // } - -// // let setup = Setup::init(&ctx).await; - -// // let lobby_id = setup -// // .create_lobby_with_lgi(&ctx, setup.lobby_group_id_host) -// // .await; - -// // let host_ip = get_lobby_host_ip(&ctx, lobby_id).await; - -// // // Echo body (bridge) -// // { -// // let (hostname, port) = get_lobby_addr(&ctx, lobby_id, "test-udp").await; -// // tracing::info!("testing udp to {}:{}", hostname, port); - -// // let random_body = Uuid::new_v4(); -// // let socket = UdpSocket::bind(("0.0.0.0", 0)).unwrap(); -// // socket.connect((hostname, port)).unwrap(); -// // socket.send(random_body.as_ref()).unwrap(); - -// // let mut response = [0; 2048]; -// // let recv_len = socket.recv(&mut response).unwrap(); - -// // assert_eq!( -// // random_body.as_bytes(), -// // &response[..recv_len], -// // "echoed wrong response" -// // ); -// // } - -// // // Echo body (host) -// // { -// // tracing::info!("testing udp to {}:{}", host_ip, setup.host_port_udp); - -// // let random_body = Uuid::new_v4(); -// // let socket = UdpSocket::bind(("0.0.0.0", 0)).unwrap(); -// // socket.connect((host_ip, setup.host_port_udp)).unwrap(); -// // socket.send(random_body.as_ref()).unwrap(); - -// // let mut response = [0; 2048]; -// // let recv_len = socket.recv(&mut response).unwrap(); - -// // assert_eq!( -// // random_body.as_bytes(), -// // &response[..recv_len], -// // "echoed wrong response" -// // ); -// // } -// // } - -// /// Fetches the address to get the lobby from. -// async fn get_lobby_addr(ctx: &TestCtx, lobby_id: Uuid, port: &str) -> (String, u16) { -// let lobby_res = op!([ctx] mm_lobby_get { lobby_ids: vec![lobby_id.into()] }) -// .await -// .unwrap(); -// let lobby = lobby_res.lobbies.first().unwrap(); -// let run_id = lobby.run_id.unwrap(); - -// let run_res = op!([ctx] job_run_get { run_ids: vec![run_id] }) -// .await -// .unwrap(); -// let run = run_res.runs.first().unwrap(); - -// let port = run -// .proxied_ports -// .iter() -// .find(|x| x.target_nomad_port_label == Some(util_ds::format_nomad_port_label(port))) -// .unwrap(); - -// ( -// port.ingress_hostnames.first().unwrap().clone(), -// port.ingress_port as u16, -// ) -// } - -// /// Fetches the address to get the lobby from for host networking. -// async fn get_lobby_host_ip(ctx: &TestCtx, lobby_id: Uuid) -> String { -// let lobby_res = op!([ctx] mm_lobby_get { lobby_ids: vec![lobby_id.into()] }) -// .await -// .unwrap(); -// let lobby = lobby_res.lobbies.first().unwrap(); -// let run_id = lobby.run_id.unwrap(); - -// let run_res = op!([ctx] job_run_get { run_ids: vec![run_id] }) -// .await -// .unwrap(); -// let run = run_res.runs.first().unwrap(); - -// let run_meta = run.run_meta.as_ref().unwrap(); -// let Some(backend::job::run_meta::Kind::Nomad(run_meta_nomad)) = &run_meta.kind else { -// panic!() -// }; - -// run_meta_nomad.node_public_ipv4.clone().unwrap() -// } +mod common; + +use std::{ + io::{BufRead, BufReader, Write}, + net::{TcpStream, UdpSocket}, +}; + +use chirp_workflow::prelude::*; +use common::*; + +#[workflow_test] +async fn server_connectivity_http_normal(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let setup = Setup::init(&ctx).await; + + let server_id = setup.create_bridge_server(&ctx).await; + + let (hostname, _) = get_server_addr(&ctx, server_id, "test-http").await; + tracing::info!("testing http to {}", hostname); + + // Echo body + let random_body = Uuid::new_v4().to_string(); + let client = reqwest::Client::new(); + let res = client + .post(format!("http://{hostname}")) + .body(random_body.clone()) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let res_text = res.text().await.unwrap(); + assert_eq!(random_body, res_text, "echoed wrong response"); +} + +#[workflow_test] +async fn server_connectivity_http_host(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let setup = Setup::init(&ctx).await; + + let server_id = setup.create_host_server(&ctx).await; + + // Echo body (bridge) + { + let (hostname, _) = get_server_addr(&ctx, server_id, "test-http").await; + tracing::info!("testing http to {}", hostname); + + let random_body = Uuid::new_v4().to_string(); + let client = reqwest::Client::new(); + let res = client + .post(format!("http://{hostname}")) + .body(random_body.clone()) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let res_text = res.text().await.unwrap(); + assert_eq!(random_body, res_text, "echoed wrong response"); + } + + // // Echo body (host) + // { + // let (hostname, port) = get_server_addr(&ctx, server_id, "test-host-http").await; + // tracing::info!("testing http to {}:{}", hostname, port); + + // let random_body = Uuid::new_v4().to_string(); + // let client = reqwest::Client::new(); + // let res = client + // .post(format!("http://{hostname}:{port}")) + // .body(random_body.clone()) + // .send() + // .await + // .unwrap() + // .error_for_status() + // .unwrap(); + // let res_text = res.text().await.unwrap(); + // assert_eq!(random_body, res_text, "echoed wrong response"); + // } +} + +#[workflow_test] +async fn server_connectivity_tcp(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let setup = Setup::init(&ctx).await; + + let server_id = setup.create_bridge_server(&ctx).await; + + let (hostname, port) = get_server_addr(&ctx, server_id, "test-tcp").await; + tracing::info!("testing tcp to {}:{}", hostname, port); + + // Echo body + let random_body = Uuid::new_v4().to_string(); + let mut stream = TcpStream::connect((hostname, port)).unwrap(); + + stream.write_all(random_body.as_ref()).unwrap(); + stream.write_all(b"\n").unwrap(); + stream.flush().unwrap(); + + let mut reader = BufReader::new(&stream); + let mut response = String::new(); + reader.read_line(&mut response).expect("read line"); + + assert_eq!( + &random_body, + &response[..response.len() - 1], + "echoed wrong response" + ); +} + +#[workflow_test] +async fn server_connectivity_tcp_host(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let setup = Setup::init(&ctx).await; + + let server_id = setup.create_host_server(&ctx).await; + + // Echo body (bridge) + { + let (hostname, port) = get_server_addr(&ctx, server_id, "test-tcp").await; + tracing::info!("testing tcp to {}:{}", hostname, port); + + // Echo body + let random_body = Uuid::new_v4().to_string(); + let mut stream = TcpStream::connect((hostname, port)).unwrap(); + + stream.write_all(random_body.as_ref()).unwrap(); + stream.write_all(b"\n").unwrap(); + stream.flush().unwrap(); + + let mut reader = BufReader::new(&stream); + let mut response = String::new(); + reader.read_line(&mut response).expect("read line"); + + assert_eq!( + &random_body, + &response[..response.len() - 1], + "echoed wrong response" + ); + } + + // // Echo body (host) + // { + // let (hostname, port) = get_server_addr(&ctx, server_id, "test-host-tcp").await; + // tracing::info!("testing tcp to {}:{}", hostname, port); + + // let random_body = Uuid::new_v4().to_string(); + // let mut stream = TcpStream::connect((hostname, port)).unwrap(); + + // stream.write_all(random_body.as_ref()).unwrap(); + // stream.write_all(b"\n").unwrap(); + // stream.flush().unwrap(); + + // let mut reader = BufReader::new(&stream); + // let mut response = String::new(); + // reader.read_line(&mut response).expect("read line"); + + // assert_eq!( + // &random_body, + // &response[..response.len() - 1], + // "echoed wrong response" + // ); + // } +} + +#[workflow_test] +async fn server_connectivity_udp(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let setup = Setup::init(&ctx).await; + + let server_id = setup.create_bridge_server(&ctx).await; + + let (hostname, port) = get_server_addr(&ctx, server_id, "test-udp").await; + tracing::info!("testing udp to {}:{}", hostname, port); + + // Echo body + let random_body = Uuid::new_v4(); + let socket = UdpSocket::bind(("0.0.0.0", 0)).unwrap(); + socket.connect((hostname, port)).unwrap(); + socket.send(random_body.as_ref()).unwrap(); + + let mut response = [0; 2048]; + let recv_len = socket.recv(&mut response).unwrap(); + + assert_eq!( + random_body.as_bytes(), + &response[..recv_len], + "echoed wrong response" + ); +} + +#[workflow_test] +async fn server_connectivity_udp_host(ctx: TestCtx) { + if !util::feature::job_run() { + return; + } + + let setup = Setup::init(&ctx).await; + + let server_id = setup.create_host_server(&ctx).await; + + // Echo body (host) + { + let (hostname, port) = get_server_addr(&ctx, server_id, "test-udp").await; + tracing::info!("testing udp to {}:{}", hostname, port); + + let random_body = Uuid::new_v4(); + let socket = UdpSocket::bind(("0.0.0.0", 0)).unwrap(); + socket.connect((hostname, port)).unwrap(); + socket.send(random_body.as_ref()).unwrap(); + + let mut response = [0; 2048]; + let recv_len = socket.recv(&mut response).unwrap(); + + assert_eq!( + random_body.as_bytes(), + &response[..recv_len], + "echoed wrong response" + ); + } + + // // Echo body (host) + // { + // let (hostname, port) = get_server_addr(&ctx, server_id, "test-host-udp").await; + // tracing::info!("testing udp to {}:{}", hostname, port); + + // let random_body = Uuid::new_v4(); + // let socket = UdpSocket::bind(("0.0.0.0", 0)).unwrap(); + // socket.connect((hostname, port)).unwrap(); + // socket.send(random_body.as_ref()).unwrap(); + + // let mut response = [0; 2048]; + // let recv_len = socket.recv(&mut response).unwrap(); + + // assert_eq!( + // random_body.as_bytes(), + // &response[..recv_len], + // "echoed wrong response" + // ); + // } +} + +async fn get_server_addr(ctx: &TestCtx, server_id: Uuid, port: &str) -> (String, u16) { + let server = ctx + .op(ds::ops::server::get::Input { + server_ids: vec![server_id], + }) + .await + .unwrap() + .servers + .into_iter() + .next() + .unwrap(); + + let hostname = server + .network_ports + .get(port) + .expect("no port") + .public_hostname + .as_ref() + .expect("no public hostname"); + let port = server + .network_ports + .get(port) + .expect("no port") + .public_port + .as_ref() + .expect("no public port"); + + (hostname.clone(), *port as u16) +} diff --git a/svc/pkg/ds/tests/print_test_data.rs b/svc/pkg/ds/tests/print_test_data.rs index f5ac94df83..f176795ab5 100644 --- a/svc/pkg/ds/tests/print_test_data.rs +++ b/svc/pkg/ds/tests/print_test_data.rs @@ -1,8 +1,10 @@ use chirp_workflow::prelude::*; +use ds::types; use rivet_operation::prelude::proto::{ self, backend::{self, pkg::token}, }; +use serde_json::json; #[workflow_test] async fn print_test_data(ctx: TestCtx) { @@ -132,88 +134,88 @@ async fn print_test_data(ctx: TestCtx) { "test data" ); - // - // let runtime = Some( - // proto::backend::pkg::dynamic_servers::server_create::request::Runtime::DockerRuntime( - // proto::backend::ds::DockerRuntime { - // args: Vec::new(), - // environment: HashMap::new(), - // image_id: Some(build_res.build_id.unwrap()), - // network: Some(proto::backend::ds::DockerNetwork { - // mode: 0, - // ports: vec![( - // "testing2".to_string(), - // backend::ds::DockerPort { - // port: Some(28234), - // routing: Some( - // backend::ds::docker_port::Routing::GameGuard( - // backend::ds::DockerGameGuardRouting { - // protocol: 0, - // }, - // ), - // ), - // }, - // )] - // // Collect into hashmap - // .into_iter() - // .collect(), - // }), - // }, - // ), - // ); - // - // let faker_region = op!([ctx] faker_region {}).await.unwrap(); - // - // tracing::info!(?env_id); - // - // let server = op!([ctx] ds_server_create { - // env_id: Some(env_id), - // cluster_id: Some(cluster_id), - // datacenter_id: faker_region.region_id, - // resources: Some(proto::backend::ds::ServerResources { cpu_millicores: 100, memory_mib: 200 }), - // kill_timeout_ms: 0, - // metadata: HashMap::new(), - // runtime: runtime, - // }) - // .await - // .unwrap() - // .server - // .unwrap(); - // - // // TODO: Switch this - // // let hostname = format!( - // // "{}-{}.server.{}.rivet.run", - // // server.server_id.unwrap(), - // // "1234", - // // faker_region.region_id.unwrap() - // // ); - // - // let hostname = format!( - // "{}-{}.lobby.{}.{}", - // server.server_id.unwrap(), - // "testing2", - // faker_region.region_id.unwrap(), - // util::env::domain_job().unwrap(), - // ); - // - // // Async sleep for 5 seconds - // tokio::time::sleep(std::time::Duration::from_secs(30)).await; - // - // tracing::info!(?hostname, "hostest mostest"); - // - // // Echo body - // let random_body = Uuid::new_v4().to_string(); - // let client = reqwest::Client::new(); - // let res = client - // .post(format!("http://{hostname}")) - // .body(random_body.clone()) - // .send() - // .await - // .unwrap() - // .error_for_status() - // .unwrap(); - // let res_text = res.text().await.unwrap(); - // assert_eq!(random_body, res_text, "echoed wrong response"); - // - // assert_eq!(game_res.env_id.unwrap(), server.prod_env_id.unwrap().as_uuid()); + let faker_region = op!([ctx] faker_region {}).await.unwrap(); + + let env = vec![ + ("some_envkey_test".to_string(), "2134523".to_string()), + ("HTTP_PORT".to_string(), "28234".to_string()), + ] + .into_iter() + .collect(); + + let ports = vec![( + "testing2".to_string(), + ds::workflows::server::Port { + internal_port: Some(28234), + routing: types::Routing::GameGuard { + protocol: types::GameGuardProtocol::Http, + }, + }, + )] + // Collect into hashmap + .into_iter() + .collect(); + + let server_id = Uuid::new_v4(); + + let mut sub = ctx + .subscribe::(&json!({ + "server_id": server_id, + })) + .await + .unwrap(); + + ctx.dispatch_tagged_workflow( + &json!({ + "server_id": server_id, + }), + ds::workflows::server::Input { + server_id, + env_id: *env_id, + cluster_id, + datacenter_id: faker_region.region_id.unwrap().as_uuid(), + resources: ds::types::ServerResources { + cpu_millicores: 100, + memory_mib: 200, + }, + kill_timeout_ms: 0, + tags: vec![(String::from("test"), String::from("123"))] + .into_iter() + .collect(), + args: Vec::new(), + environment: env, + image_id: build_res.build_id.unwrap().as_uuid(), + network_mode: types::NetworkMode::Bridge, + network_ports: ports, + }, + ) + .await + .unwrap(); + + sub.next().await.unwrap(); + + let hostname = format!( + "{}-{}.lobby.{}.{}", + server_id, + "testing2", + faker_region.region_id.unwrap(), + util::env::domain_job().unwrap(), + ); + + // Async sleep for 5 seconds + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + // Echo body + let random_body = Uuid::new_v4().to_string(); + let client = reqwest::Client::new(); + let res = client + .post(format!("http://{hostname}")) + .body(random_body.clone()) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let res_text = res.text().await.unwrap(); + assert_eq!(random_body, res_text, "echoed wrong response"); } diff --git a/svc/pkg/ds/tests/server_create.rs b/svc/pkg/ds/tests/server_create.rs index 8ebf3065fb..d4b5c37d12 100644 --- a/svc/pkg/ds/tests/server_create.rs +++ b/svc/pkg/ds/tests/server_create.rs @@ -1,10 +1,7 @@ use chirp_workflow::prelude::*; use ds::types; // use rivet_api::{apis::*, models}; -use rivet_operation::prelude::proto::{ - self, - backend::{self, pkg::token}, -}; +use rivet_operation::prelude::proto::backend; use serde_json::json; #[workflow_test] @@ -16,29 +13,6 @@ async fn server_create(ctx: TestCtx) { .unwrap(); let env_id = game_res.prod_env_id.unwrap(); - // Create token - let token_res = op!([ctx] token_create { - token_config: Some(token::create::request::TokenConfig { - ttl: util::duration::days(90), - }), - issuer: "test".to_owned(), - kind: Some(token::create::request::Kind::New( - token::create::request::KindNew { entitlements: vec![proto::claims::Entitlement { - kind: Some(proto::claims::entitlement::Kind::EnvService( - proto::claims::entitlement::EnvService { - env_id: Some(env_id), - } - )), - }]}, - )), - label: Some("env".to_owned()), - ..Default::default() - }) - .await - .unwrap(); - - tracing::info!("token_res for key: {:?}", token_res); - // Pick an existing cluster let cluster_id = ctx .op(cluster::ops::list::Input {}) @@ -104,7 +78,6 @@ async fn server_create(ctx: TestCtx) { memory_mib: 200, }, kill_timeout_ms: 0, - // webhook_url: Some("https://rivettest.free.beeceptor.com".to_string()), tags: vec![(String::from("test"), String::from("123"))] .into_iter() .collect(), @@ -120,24 +93,27 @@ async fn server_create(ctx: TestCtx) { sub.next().await.unwrap(); - // TODO: Switch this - // let hostname = format!( - // "{}-{}.server.{}.rivet.run", - // server.server_id.unwrap(), - // "1234", - // faker_region.region_id.unwrap() - // ); + // Async sleep for 5 seconds + tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let hostname = format!( - "{}-{}.lobby.{}.{}", - server_id, - "testing2", - faker_region.region_id.unwrap(), - util::env::domain_job().unwrap(), - ); + let server = ctx + .op(ds::ops::server::get::Input { + server_ids: vec![server_id], + }) + .await + .unwrap() + .servers + .into_iter() + .next() + .unwrap(); - // Async sleep for 5 seconds - tokio::time::sleep(std::time::Duration::from_secs(30)).await; + let hostname = server + .network_ports + .get("testing2") + .unwrap() + .public_hostname + .as_ref() + .expect("no public hostname"); // Echo body let random_body = Uuid::new_v4().to_string();