Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Infra** Aiven Redis provider
- **Bolt** `bolt secret set <path> <value>` command
- `watch-requests` load test
- `mm-sustain` load test

### Changed

Expand Down
28 changes: 28 additions & 0 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions svc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ members = [
"pkg/kv/worker",
"pkg/load-test/standalone/api-cloud",
"pkg/load-test/standalone/mm",
"pkg/load-test/standalone/mm-sustain",
"pkg/load-test/standalone/sqlx",
"pkg/load-test/standalone/watch-requests",
"pkg/mm-config/ops/lobby-group-get",
Expand Down
33 changes: 33 additions & 0 deletions svc/pkg/load-test/standalone/mm-sustain/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "load-test-mm-sustain"
version = "0.0.1"
edition = "2018"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
chirp-client = { path = "../../../../../lib/chirp/client" }
rivet-operation = { path = "../../../../../lib/operation/core" }
rivet-connection = { path = "../../../../../lib/connection" }
rivet-runtime = { path = "../../../../../lib/runtime" }
tokio = { version = "1.29", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] }
rivet-api = { path = "../../../../../sdks/rust" }
reqwest = "0.11"

faker-game = { path = "../../../faker/ops/game" }
faker-team = { path = "../../../faker/ops/team" }
faker-game-version = { path = "../../../faker/ops/game-version" }
faker-region = { path = "../../../faker/ops/region" }
faker-build = { path = "../../../faker/ops/build" }
mm-config-version-get = { path = "../../../mm-config/ops/version-get" }
game-namespace-version-set = { path = "../../../game/ops/namespace-version-set" }
mm-lobby-get = { path = "../../../mm/ops/lobby-get" }
job-run-get = { path = "../../../job-run/ops/get" }
user-identity-create = { path = "../../../user-identity/ops/create" }
token-create = { path = "../../../token/ops/create" }
util-mm = { package = "rivet-util-mm", path = "../../../mm/util" }

[dev-dependencies]
chirp-worker = { path = "../../../../../lib/chirp/worker" }
9 changes: 9 additions & 0 deletions svc/pkg/load-test/standalone/mm-sustain/Service.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[service]
name = "load-test-mm-sustain"
load-test = true

[runtime]
kind = "rust"

[headless]

276 changes: 276 additions & 0 deletions svc/pkg/load-test/standalone/mm-sustain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
use futures_util::StreamExt;
use proto::{
backend::{self, pkg::*},
common,
};
use rivet_api::{apis::configuration::Configuration, models};
use rivet_operation::prelude::*;
use serde_json::json;
use std::collections::{HashMap, HashSet};
use tokio::time::{interval, Duration, Instant};

#[tracing::instrument(skip_all)]
pub async fn run_from_env(ts: i64) -> GlobalResult<()> {
let pools = rivet_pools::from_env("load-test-mm-sustain").await?;
let client =
chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("load-test-mm-sustain");
let cache = rivet_cache::CacheInner::from_env(pools.clone())?;
let ctx = OperationContext::new(
"load-test-mm-sustain".into(),
std::time::Duration::from_secs(60),
rivet_connection::Connection::new(client, pools, cache),
Uuid::new_v4(),
Uuid::new_v4(),
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

// Region
let region_res = op!([ctx] faker_region {}).await?;
let region_id = unwrap!(region_res.region_id.as_ref()).as_uuid();

// Game
let game_res = op!([ctx] faker_game {
..Default::default()
})
.await?;
let namespace_id = unwrap!(game_res.namespace_ids.first()).clone().as_uuid();

let build_res = op!([ctx] faker_build {
game_id: game_res.game_id,
image: backend::faker::Image::MmLobbyEcho as i32,
})
.await?;

let game_version_res = op!([ctx] faker_game_version {
game_id: game_res.game_id,
override_lobby_groups: Some(faker::game_version::request::OverrideLobbyGroups {
lobby_groups: vec![backend::matchmaker::LobbyGroup {
name_id: "test-1".into(),

regions: vec![backend::matchmaker::lobby_group::Region {
region_id: Some(region_id.into()),
tier_name_id: util_mm::test::TIER_NAME_ID.to_owned(),
idle_lobbies: Some(backend::matchmaker::lobby_group::IdleLobbies {
min_idle_lobbies: 0,
// Don't auto-destroy lobbies from tests
max_idle_lobbies: 32,
}),
}],
max_players_normal: 8,
max_players_direct: 10,
max_players_party: 12,
listable: true,
taggable: false,
allow_dynamic_max_players: false,

runtime: Some(backend::matchmaker::lobby_runtime::Docker {
build_id: build_res.build_id,
args: Vec::new(),
env_vars: vec![
backend::matchmaker::lobby_runtime::EnvVar {
key: "HELLO".into(),
value: "world".into(),
},
],
network_mode: backend::matchmaker::lobby_runtime::NetworkMode::Bridge as i32,
ports: vec![
backend::matchmaker::lobby_runtime::Port {
label: "test-http".into(),
target_port: Some(8001),
port_range: None,
proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Http as i32,
proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32,
},
backend::matchmaker::lobby_runtime::Port {
label: "test-tcp".into(),
target_port: Some(8002),
port_range: None,
proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Tcp as i32,
proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32,
},
backend::matchmaker::lobby_runtime::Port {
label: "test-udp".into(),
target_port: Some(8003),
port_range: None,
proxy_protocol: backend::matchmaker::lobby_runtime::ProxyProtocol::Udp as i32,
proxy_kind: backend::matchmaker::lobby_runtime::ProxyKind::GameGuard as i32,
},
],

}.into()),

actions: None,
}]
}),
..Default::default()
})
.await?;

let version_get_res = op!([ctx] mm_config_version_get {
version_ids: vec![unwrap!(game_version_res.version_id)],
})
.await?;
let lobby_groups = &unwrap!(unwrap!(version_get_res.versions.first())
.config_meta
.as_ref())
.lobby_groups;
let lobby_group_id = unwrap!(lobby_groups[0].lobby_group_id.as_ref()).as_uuid();

op!([ctx] game_namespace_version_set {
namespace_id: Some(namespace_id.into()),
version_id: game_version_res.version_id,
})
.await?;

let parallel_workers = 4;
for i in 0..parallel_workers {
tokio::spawn(run_lobby_worker(
ctx.clone(),
i,
namespace_id,
region_id,
lobby_group_id,
));
}

Ok(())
}

async fn run_lobby_worker(
ctx: OperationContext<()>,
worker_idx: usize,
namespace_id: Uuid,
region_id: Uuid,
lobby_group_id: Uuid,
) {
loop {
let start = Instant::now();
let lobby_id = Uuid::new_v4();
match run_lobby_lifecycle(&ctx, lobby_id, namespace_id, region_id, lobby_group_id).await {
Ok(_) => {
tracing::info!(duration = %start.elapsed().as_secs_f64(), "lobby lifecycle success")
}
Err(err) => {
tracing::error!(duration = %start.elapsed().as_secs_f64(), ?err, "lobby lifecycle fail")
}
}

// Shut down lobby
tracing::info!(?lobby_id, "shutting down lobby");
msg!([ctx] mm::msg::lobby_stop(lobby_id) -> mm::msg::lobby_cleanup_complete(lobby_id) {
lobby_id: Some(lobby_id.into()),
})
.await
.unwrap();
}
}

async fn run_lobby_lifecycle(
ctx: &OperationContext<()>,
lobby_id: Uuid,
namespace_id: Uuid,
region_id: Uuid,
lobby_group_id: Uuid,
) -> GlobalResult<()> {
// Create lobby
tracing::info!(?lobby_id, "creating lobby");
msg!([ctx] @notrace mm::msg::lobby_create(lobby_id) -> mm::msg::lobby_ready_complete(lobby_id) {
lobby_id: Some(lobby_id.into()),
namespace_id: Some(namespace_id.into()),
lobby_group_id: Some(lobby_group_id.into()),
region_id: Some(region_id.into()),
create_ray_id: None,
preemptively_created: false,

creator_user_id: None,
is_custom: false,
publicity: None,
lobby_config_json: None,
tags: HashMap::new(),
dynamic_max_players: None,
})
.await?;

// Test HTTP connectivity
let (hostname, _) = get_lobby_addr(&ctx, lobby_id, "test-http").await?;
tracing::info!("testing http to {}", hostname);

// Echo body
let random_body = Uuid::new_v4().to_string();
let client = reqwest::Client::new();
let res = client
.post(format!("http://{hostname}"))
.body(random_body.clone())
.send()
.await?
.error_for_status()?;
let res_text = res.text().await?;
ensure_eq!(random_body, res_text, "echoed wrong response");

// Used to pause on when a gateway timeout is encountered
// let random_body = Uuid::new_v4().to_string();
// let client = reqwest::Client::new();
// let res = client
// .post(format!("http://{hostname}"))
// .body(random_body.clone())
// .send()
// .await?;
// if res.status() == reqwest::StatusCode::GATEWAY_TIMEOUT {
// let lobby_res = op!([ctx] mm_lobby_get {
// lobby_ids: vec![lobby_id.into()],
// })
// .await?;
// let lobby = unwrap!(lobby_res.lobbies.first());

// let run_res = op!([ctx] job_run_get {
// run_ids: vec![unwrap!(lobby.run_id)],
// })
// .await?;
// let run = unwrap!(run_res.runs.first());
// let run_meta = unwrap_ref!(run.run_meta);
// let Some(backend::job::run_meta::Kind::Nomad(nomad)) = run_meta.kind.as_ref() else {
// unreachable!()
// };

// let url = format!(
// "http://localhost:4646/ui/allocations/{}",
// unwrap_ref!(nomad.alloc_id)
// );

// tracing::error!(?lobby_id, alloc_url = %url, "found gateway timeout, waiting forever");

// std::future::pending::<()>().await;
// }
//
// let res_text = res.text().await?;
// ensure_eq!(random_body, res_text, "echoed wrong response");

Ok(())
}

/// Fetches the address to get the lobby from.
async fn get_lobby_addr(
ctx: &OperationContext<()>,
lobby_id: Uuid,
port: &str,
) -> GlobalResult<(String, u16)> {
let lobby_res = op!([ctx] mm_lobby_get { lobby_ids: vec![lobby_id.into()] }).await?;
let lobby = unwrap!(lobby_res.lobbies.first());
let run_id = unwrap!(lobby.run_id);

let run_res = op!([ctx] job_run_get { run_ids: vec![run_id] }).await?;
let run = unwrap!(run_res.runs.first());

let port = unwrap!(run
.proxied_ports
.iter()
.find(|x| x.target_nomad_port_label == Some(util_mm::format_nomad_port_label(port))));

Ok((
unwrap!(port.ingress_hostnames.first()).clone(),
port.ingress_port as u16,
))
}
15 changes: 15 additions & 0 deletions svc/pkg/load-test/standalone/mm-sustain/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use rivet_operation::prelude::*;

fn main() -> GlobalResult<()> {
rivet_runtime::run(start()).unwrap()
}

async fn start() -> GlobalResult<()> {
load_test_mm_sustain::run_from_env(util::timestamp::now()).await?;

tracing::info!("finished");

std::future::pending::<()>().await;

Ok(())
}
Loading