From 00544002acff1eedffff2ab4c32066dd65d9614c Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 18 Jan 2024 22:34:24 +0000 Subject: [PATCH] Add mm-sustain load test (#364) ## Changes --- CHANGELOG.md | 1 + svc/Cargo.lock | 28 ++ svc/Cargo.toml | 1 + .../standalone/mm-sustain/Cargo.toml | 33 +++ .../standalone/mm-sustain/Service.toml | 9 + .../standalone/mm-sustain/src/lib.rs | 276 ++++++++++++++++++ .../standalone/mm-sustain/src/main.rs | 15 + .../mm-sustain/tests/integration.rs | 15 + .../standalone/watch-requests/src/lib.rs | 2 +- 9 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 svc/pkg/load-test/standalone/mm-sustain/Cargo.toml create mode 100644 svc/pkg/load-test/standalone/mm-sustain/Service.toml create mode 100644 svc/pkg/load-test/standalone/mm-sustain/src/lib.rs create mode 100644 svc/pkg/load-test/standalone/mm-sustain/src/main.rs create mode 100644 svc/pkg/load-test/standalone/mm-sustain/tests/integration.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3056c7b9a7..87df44e494 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Infra** Aiven Redis provider - **Bolt** `bolt secret set ` command - `watch-requests` load test +- `mm-sustain` load test ### Changed diff --git a/svc/Cargo.lock b/svc/Cargo.lock index 3bcb358784..dd76d5d7c1 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -4550,6 +4550,34 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "load-test-mm-sustain" +version = "0.0.1" +dependencies = [ + "chirp-client", + "chirp-worker", + "faker-build", + "faker-game", + "faker-game-version", + "faker-region", + "faker-team", + "game-namespace-version-set", + "job-run-get", + "mm-config-version-get", + "mm-lobby-get", + "reqwest", + "rivet-api", + "rivet-connection", + "rivet-operation", + "rivet-runtime", + "rivet-util-mm", + "token-create", + "tokio", + "tracing", + "tracing-subscriber", + "user-identity-create", +] + [[package]] name = "load-test-sqlx" version = "0.0.1" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index b7dccb4de5..7023e2bc83 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -132,6 +132,7 @@ members = [ "pkg/kv/worker", "pkg/load-test/standalone/api-cloud", "pkg/load-test/standalone/mm", + "pkg/load-test/standalone/mm-sustain", "pkg/load-test/standalone/sqlx", "pkg/load-test/standalone/watch-requests", "pkg/mm-config/ops/lobby-group-get", diff --git a/svc/pkg/load-test/standalone/mm-sustain/Cargo.toml b/svc/pkg/load-test/standalone/mm-sustain/Cargo.toml new file mode 100644 index 0000000000..41e55a2303 --- /dev/null +++ b/svc/pkg/load-test/standalone/mm-sustain/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "load-test-mm-sustain" +version = "0.0.1" +edition = "2018" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-client = { path = "../../../../../lib/chirp/client" } +rivet-operation = { path = "../../../../../lib/operation/core" } +rivet-connection = { path = "../../../../../lib/connection" } +rivet-runtime = { path = "../../../../../lib/runtime" } +tokio = { version = "1.29", features = ["full"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } +rivet-api = { path = "../../../../../sdks/rust" } +reqwest = "0.11" + +faker-game = { path = "../../../faker/ops/game" } +faker-team = { path = "../../../faker/ops/team" } +faker-game-version = { path = "../../../faker/ops/game-version" } +faker-region = { path = "../../../faker/ops/region" } +faker-build = { path = "../../../faker/ops/build" } +mm-config-version-get = { path = "../../../mm-config/ops/version-get" } +game-namespace-version-set = { path = "../../../game/ops/namespace-version-set" } +mm-lobby-get = { path = "../../../mm/ops/lobby-get" } +job-run-get = { path = "../../../job-run/ops/get" } +user-identity-create = { path = "../../../user-identity/ops/create" } +token-create = { path = "../../../token/ops/create" } +util-mm = { package = "rivet-util-mm", path = "../../../mm/util" } + +[dev-dependencies] +chirp-worker = { path = "../../../../../lib/chirp/worker" } diff --git a/svc/pkg/load-test/standalone/mm-sustain/Service.toml b/svc/pkg/load-test/standalone/mm-sustain/Service.toml new file mode 100644 index 0000000000..f8f3db068f --- /dev/null +++ b/svc/pkg/load-test/standalone/mm-sustain/Service.toml @@ -0,0 +1,9 @@ +[service] +name = "load-test-mm-sustain" +load-test = true + +[runtime] +kind = "rust" + +[headless] + diff --git a/svc/pkg/load-test/standalone/mm-sustain/src/lib.rs b/svc/pkg/load-test/standalone/mm-sustain/src/lib.rs new file mode 100644 index 0000000000..b9dd90d4a1 --- /dev/null +++ b/svc/pkg/load-test/standalone/mm-sustain/src/lib.rs @@ -0,0 +1,276 @@ +use futures_util::StreamExt; +use proto::{ + backend::{self, pkg::*}, + common, +}; +use rivet_api::{apis::configuration::Configuration, models}; +use rivet_operation::prelude::*; +use serde_json::json; +use std::collections::{HashMap, HashSet}; +use tokio::time::{interval, Duration, Instant}; + +#[tracing::instrument(skip_all)] +pub async fn run_from_env(ts: i64) -> GlobalResult<()> { + let pools = rivet_pools::from_env("load-test-mm-sustain").await?; + let client = + chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("load-test-mm-sustain"); + let cache = rivet_cache::CacheInner::from_env(pools.clone())?; + let ctx = OperationContext::new( + "load-test-mm-sustain".into(), + std::time::Duration::from_secs(60), + rivet_connection::Connection::new(client, pools, cache), + Uuid::new_v4(), + Uuid::new_v4(), + util::timestamp::now(), + util::timestamp::now(), + (), + Vec::new(), + ); + + // Region + let region_res = op!([ctx] faker_region {}).await?; + let region_id = unwrap!(region_res.region_id.as_ref()).as_uuid(); + + // Game + let game_res = op!([ctx] faker_game { + ..Default::default() + }) + .await?; + let namespace_id = unwrap!(game_res.namespace_ids.first()).clone().as_uuid(); + + let build_res = op!([ctx] faker_build { + game_id: game_res.game_id, + image: backend::faker::Image::MmLobbyEcho as i32, + }) + .await?; + + let game_version_res = op!([ctx] faker_game_version { + game_id: game_res.game_id, + override_lobby_groups: Some(faker::game_version::request::OverrideLobbyGroups { + lobby_groups: vec![backend::matchmaker::LobbyGroup { + name_id: "test-1".into(), + + regions: vec![backend::matchmaker::lobby_group::Region { + region_id: Some(region_id.into()), + tier_name_id: util_mm::test::TIER_NAME_ID.to_owned(), + idle_lobbies: Some(backend::matchmaker::lobby_group::IdleLobbies { + min_idle_lobbies: 0, + // Don't auto-destroy lobbies from tests + max_idle_lobbies: 32, + }), + }], + max_players_normal: 8, + max_players_direct: 10, + max_players_party: 12, + listable: true, + taggable: false, + allow_dynamic_max_players: false, + + runtime: Some(backend::matchmaker::lobby_runtime::Docker { + build_id: build_res.build_id, + args: Vec::new(), + env_vars: vec![ + backend::matchmaker::lobby_runtime::EnvVar { + key: "HELLO".into(), + value: "world".into(), + }, + ], + network_mode: backend::matchmaker::lobby_runtime::NetworkMode::Bridge as i32, + ports: vec![ + backend::matchmaker::lobby_runtime::Port { + label: "test-http".into(), + target_port: Some(8001), + port_range: None, + proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Http as i32, + proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, + }, + backend::matchmaker::lobby_runtime::Port { + label: "test-tcp".into(), + target_port: Some(8002), + port_range: None, + proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Tcp as i32, + proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, + }, + backend::matchmaker::lobby_runtime::Port { + label: "test-udp".into(), + target_port: Some(8003), + port_range: None, + proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Udp as i32, + proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32, + }, + ], + + }.into()), + + actions: None, + }] + }), + ..Default::default() + }) + .await?; + + let version_get_res = op!([ctx] mm_config_version_get { + version_ids: vec![unwrap!(game_version_res.version_id)], + }) + .await?; + let lobby_groups = &unwrap!(unwrap!(version_get_res.versions.first()) + .config_meta + .as_ref()) + .lobby_groups; + let lobby_group_id = unwrap!(lobby_groups[0].lobby_group_id.as_ref()).as_uuid(); + + op!([ctx] game_namespace_version_set { + namespace_id: Some(namespace_id.into()), + version_id: game_version_res.version_id, + }) + .await?; + + let parallel_workers = 4; + for i in 0..parallel_workers { + tokio::spawn(run_lobby_worker( + ctx.clone(), + i, + namespace_id, + region_id, + lobby_group_id, + )); + } + + Ok(()) +} + +async fn run_lobby_worker( + ctx: OperationContext<()>, + worker_idx: usize, + namespace_id: Uuid, + region_id: Uuid, + lobby_group_id: Uuid, +) { + loop { + let start = Instant::now(); + let lobby_id = Uuid::new_v4(); + match run_lobby_lifecycle(&ctx, lobby_id, namespace_id, region_id, lobby_group_id).await { + Ok(_) => { + tracing::info!(duration = %start.elapsed().as_secs_f64(), "lobby lifecycle success") + } + Err(err) => { + tracing::error!(duration = %start.elapsed().as_secs_f64(), ?err, "lobby lifecycle fail") + } + } + + // Shut down lobby + tracing::info!(?lobby_id, "shutting down lobby"); + msg!([ctx] mm::msg::lobby_stop(lobby_id) -> mm::msg::lobby_cleanup_complete(lobby_id) { + lobby_id: Some(lobby_id.into()), + }) + .await + .unwrap(); + } +} + +async fn run_lobby_lifecycle( + ctx: &OperationContext<()>, + lobby_id: Uuid, + namespace_id: Uuid, + region_id: Uuid, + lobby_group_id: Uuid, +) -> GlobalResult<()> { + // Create lobby + tracing::info!(?lobby_id, "creating lobby"); + msg!([ctx] @notrace mm::msg::lobby_create(lobby_id) -> mm::msg::lobby_ready_complete(lobby_id) { + lobby_id: Some(lobby_id.into()), + namespace_id: Some(namespace_id.into()), + lobby_group_id: Some(lobby_group_id.into()), + region_id: Some(region_id.into()), + create_ray_id: None, + preemptively_created: false, + + creator_user_id: None, + is_custom: false, + publicity: None, + lobby_config_json: None, + tags: HashMap::new(), + dynamic_max_players: None, + }) + .await?; + + // Test HTTP connectivity + let (hostname, _) = get_lobby_addr(&ctx, lobby_id, "test-http").await?; + tracing::info!("testing http to {}", hostname); + + // Echo body + let random_body = Uuid::new_v4().to_string(); + let client = reqwest::Client::new(); + let res = client + .post(format!("http://{hostname}")) + .body(random_body.clone()) + .send() + .await? + .error_for_status()?; + let res_text = res.text().await?; + ensure_eq!(random_body, res_text, "echoed wrong response"); + + // Used to pause on when a gateway timeout is encountered + // let random_body = Uuid::new_v4().to_string(); + // let client = reqwest::Client::new(); + // let res = client + // .post(format!("http://{hostname}")) + // .body(random_body.clone()) + // .send() + // .await?; + // if res.status() == reqwest::StatusCode::GATEWAY_TIMEOUT { + // let lobby_res = op!([ctx] mm_lobby_get { + // lobby_ids: vec![lobby_id.into()], + // }) + // .await?; + // let lobby = unwrap!(lobby_res.lobbies.first()); + + // let run_res = op!([ctx] job_run_get { + // run_ids: vec![unwrap!(lobby.run_id)], + // }) + // .await?; + // let run = unwrap!(run_res.runs.first()); + // let run_meta = unwrap_ref!(run.run_meta); + // let Some(backend::job::run_meta::Kind::Nomad(nomad)) = run_meta.kind.as_ref() else { + // unreachable!() + // }; + + // let url = format!( + // "http://localhost:4646/ui/allocations/{}", + // unwrap_ref!(nomad.alloc_id) + // ); + + // tracing::error!(?lobby_id, alloc_url = %url, "found gateway timeout, waiting forever"); + + // std::future::pending::<()>().await; + // } + // + // let res_text = res.text().await?; + // ensure_eq!(random_body, res_text, "echoed wrong response"); + + Ok(()) +} + +/// Fetches the address to get the lobby from. +async fn get_lobby_addr( + ctx: &OperationContext<()>, + lobby_id: Uuid, + port: &str, +) -> GlobalResult<(String, u16)> { + let lobby_res = op!([ctx] mm_lobby_get { lobby_ids: vec![lobby_id.into()] }).await?; + let lobby = unwrap!(lobby_res.lobbies.first()); + let run_id = unwrap!(lobby.run_id); + + let run_res = op!([ctx] job_run_get { run_ids: vec![run_id] }).await?; + let run = unwrap!(run_res.runs.first()); + + let port = unwrap!(run + .proxied_ports + .iter() + .find(|x| x.target_nomad_port_label == Some(util_mm::format_nomad_port_label(port)))); + + Ok(( + unwrap!(port.ingress_hostnames.first()).clone(), + port.ingress_port as u16, + )) +} diff --git a/svc/pkg/load-test/standalone/mm-sustain/src/main.rs b/svc/pkg/load-test/standalone/mm-sustain/src/main.rs new file mode 100644 index 0000000000..c958feda66 --- /dev/null +++ b/svc/pkg/load-test/standalone/mm-sustain/src/main.rs @@ -0,0 +1,15 @@ +use rivet_operation::prelude::*; + +fn main() -> GlobalResult<()> { + rivet_runtime::run(start()).unwrap() +} + +async fn start() -> GlobalResult<()> { + load_test_mm_sustain::run_from_env(util::timestamp::now()).await?; + + tracing::info!("finished"); + + std::future::pending::<()>().await; + + Ok(()) +} diff --git a/svc/pkg/load-test/standalone/mm-sustain/tests/integration.rs b/svc/pkg/load-test/standalone/mm-sustain/tests/integration.rs new file mode 100644 index 0000000000..ff24c8af80 --- /dev/null +++ b/svc/pkg/load-test/standalone/mm-sustain/tests/integration.rs @@ -0,0 +1,15 @@ +use chirp_worker::prelude::*; + +use ::load_test_mm_sustain::run_from_env; + +#[tokio::test(flavor = "multi_thread")] +async fn basic() { + tracing_subscriber::fmt() + .json() + .with_max_level(tracing::Level::INFO) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE) + .init(); + + // TODO: + run_from_env(util::timestamp::now()).await.unwrap(); +} diff --git a/svc/pkg/load-test/standalone/watch-requests/src/lib.rs b/svc/pkg/load-test/standalone/watch-requests/src/lib.rs index 37a2c1252c..a8c45d4e12 100644 --- a/svc/pkg/load-test/standalone/watch-requests/src/lib.rs +++ b/svc/pkg/load-test/standalone/watch-requests/src/lib.rs @@ -119,7 +119,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { ); headers.insert( "host", - reqwest::header::HeaderValue::from_str("api.nathan5.gameinc.io")?, + reqwest::header::HeaderValue::from_str(util::env::domain_main_api())?, ); headers.insert( "cf-connecting-ip",