diff --git a/CHANGELOG.md b/CHANGELOG.md index ed4422cbdd..78f609504f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Infra** `infra-artifacts` Terraform plan & S3 bucket used for automating building & uploading internal binaries, etc. - **Infra** Aiven Redis provider - **Bolt** `bolt secret set ` command +- `watch-requests` load test ### Changed diff --git a/svc/Cargo.lock b/svc/Cargo.lock index d529bf5381..f36d2abd92 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -4570,6 +4570,26 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "load-test-watch-requests" +version = "0.0.1" +dependencies = [ + "chirp-client", + "chirp-worker", + "faker-game", + "faker-team", + "reqwest", + "rivet-api", + "rivet-connection", + "rivet-operation", + "rivet-runtime", + "token-create", + "tokio", + "tracing", + "tracing-subscriber", + "user-identity-create", +] + [[package]] name = "lock_api" version = "0.4.11" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index aa41e7c86d..b7dccb4de5 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -133,6 +133,7 @@ members = [ "pkg/load-test/standalone/api-cloud", "pkg/load-test/standalone/mm", "pkg/load-test/standalone/sqlx", + "pkg/load-test/standalone/watch-requests", "pkg/mm-config/ops/lobby-group-get", "pkg/mm-config/ops/lobby-group-resolve-name-id", "pkg/mm-config/ops/lobby-group-resolve-version", diff --git a/svc/pkg/load-test/standalone/watch-requests/Cargo.toml b/svc/pkg/load-test/standalone/watch-requests/Cargo.toml new file mode 100644 index 0000000000..4bfb9e6795 --- /dev/null +++ b/svc/pkg/load-test/standalone/watch-requests/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "load-test-watch-requests" +version = "0.0.1" +edition = "2018" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-client = { path = "../../../../../lib/chirp/client" } +rivet-operation = { path = "../../../../../lib/operation/core" } +rivet-connection = { path = "../../../../../lib/connection" } +rivet-runtime = { path = "../../../../../lib/runtime" } +tokio = { version = "1.29", features = ["full"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } +rivet-api = { path = "../../../../../sdks/rust" } +reqwest = "0.11" + +faker-game = { path = "../../../faker/ops/game" } +faker-team = { path = "../../../faker/ops/team" } +user-identity-create = { path = "../../../user-identity/ops/create" } +token-create = { path = "../../../token/ops/create" } + +[dev-dependencies] +chirp-worker = { path = "../../../../../lib/chirp/worker" } diff --git a/svc/pkg/load-test/standalone/watch-requests/Service.toml b/svc/pkg/load-test/standalone/watch-requests/Service.toml new file mode 100644 index 0000000000..4481ad08c3 --- /dev/null +++ b/svc/pkg/load-test/standalone/watch-requests/Service.toml @@ -0,0 +1,9 @@ +[service] +name = "load-test-watch-requests" +load-test = true + +[runtime] +kind = "rust" + +[headless] + diff --git a/svc/pkg/load-test/standalone/watch-requests/src/lib.rs b/svc/pkg/load-test/standalone/watch-requests/src/lib.rs new file mode 100644 index 0000000000..37a2c1252c --- /dev/null +++ b/svc/pkg/load-test/standalone/watch-requests/src/lib.rs @@ -0,0 +1,170 @@ +use std::collections::HashSet; + +use futures_util::StreamExt; +use proto::{ + backend::{self, pkg::*}, + common, +}; +use rivet_api::{apis::configuration::Configuration, models}; +use rivet_operation::prelude::*; +use serde_json::json; +use tokio::time::{interval, Duration, Instant}; + +#[tracing::instrument(skip_all)] +pub async fn run_from_env(ts: i64) -> GlobalResult<()> { + let pools = rivet_pools::from_env("load-test-watch-requests").await?; + let client = + chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("load-test-watch-requests"); + let cache = rivet_cache::CacheInner::from_env(pools.clone())?; + let ctx = OperationContext::new( + "load-test-watch-requests".into(), + std::time::Duration::from_secs(60), + rivet_connection::Connection::new(client, pools, cache), + Uuid::new_v4(), + Uuid::new_v4(), + util::timestamp::now(), + util::timestamp::now(), + (), + Vec::new(), + ); + + // Create temp team + let (team_id, primary_user_id) = { + // Create team + let create_res = op!([ctx] faker_team { + + ..Default::default() + }) + .await?; + let team_id = unwrap_ref!(create_res.team_id).as_uuid(); + let primary_user_id = create_res.member_user_ids[0].as_uuid(); + + // Register user + op!([ctx] user_identity_create { + user_id: Some(primary_user_id.into()), + identity: Some(backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email(backend::user_identity::identity::Email { + email: util::faker::email() + })) + }) + }) + .await + .unwrap(); + + (team_id, primary_user_id) + }; + + // Encode user token + let auth_token = { + let token_res = op!([ctx] token_create { + issuer: "test".into(), + token_config: Some(token::create::request::TokenConfig { + ttl: util::duration::hours(1), + }), + refresh_token_config: None, + client: Some(backend::net::ClientInfo { + user_agent: Some("Test".into()), + remote_address: Some("0.0.0.0".into()), + }), + kind: Some(token::create::request::Kind::New(token::create::request::KindNew { + entitlements: vec![ + proto::claims::Entitlement { + kind: Some( + proto::claims::entitlement::Kind::User(proto::claims::entitlement::User { + user_id: Some(primary_user_id.into()), + }) + ) + }, + ], + })), + label: Some("usr".into()), + ..Default::default() + }) + .await + .unwrap(); + let token = token_res.token.unwrap(); + + token.token + }; + let bypass_token = { + let token_res = op!([ctx] token_create { + token_config: Some(token::create::request::TokenConfig { + ttl: util::duration::hours(1) + }), + refresh_token_config: None, + issuer: "api-status".to_owned(), + client: None, + kind: Some(token::create::request::Kind::New(token::create::request::KindNew { + entitlements: vec![ + proto::claims::Entitlement { + kind: Some( + proto::claims::entitlement::Kind::Bypass(proto::claims::entitlement::Bypass { }) + ) + } + ], + })), + label: Some("byp".to_owned()), + ..Default::default() + }) + .await?; + unwrap_ref!(token_res.token).token.clone() + }; + + let client = reqwest::Client::builder() + .default_headers({ + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + "x-bypass-token", + reqwest::header::HeaderValue::from_str(&bypass_token)?, + ); + headers.insert( + "host", + reqwest::header::HeaderValue::from_str("api.nathan5.gameinc.io")?, + ); + headers.insert( + "cf-connecting-ip", + reqwest::header::HeaderValue::from_str("192.0.2.0")?, + ); + headers + }) + .build()?; + let config = Configuration { + client, + base_path: "http://traefik.traefik.svc.cluster.local:80".into(), + bearer_access_token: Some(auth_token), + ..Default::default() + }; + + let mut interval = tokio::time::interval(Duration::from_millis(50)); + for i in 0..3_000 { + interval.tick().await; + + if i % 100 == 0 { + tracing::info!(?i, "request"); + } + + let config = config.clone(); + tokio::spawn(async move { + let mut watch_index = Option::::None; + loop { + let start = Instant::now(); + match rivet_api::apis::identity_events_api::identity_events_watch( + &config, + watch_index.as_ref().map(|x| x.as_str()), + ) + .await + { + Ok(res) => { + if watch_index.is_none() { + tracing::info!(elapsed = ?start.elapsed().as_secs_f64(), "received initial response"); + } + watch_index = Some(res.watch.index); + } + Err(err) => tracing::error!(?err, "error"), + } + } + }); + } + + Ok(()) +} diff --git a/svc/pkg/load-test/standalone/watch-requests/src/main.rs b/svc/pkg/load-test/standalone/watch-requests/src/main.rs new file mode 100644 index 0000000000..8eeaeda432 --- /dev/null +++ b/svc/pkg/load-test/standalone/watch-requests/src/main.rs @@ -0,0 +1,15 @@ +use rivet_operation::prelude::*; + +fn main() -> GlobalResult<()> { + rivet_runtime::run(start()).unwrap() +} + +async fn start() -> GlobalResult<()> { + load_test_watch_requests::run_from_env(util::timestamp::now()).await?; + + tracing::info!("finished"); + + std::future::pending::<()>().await; + + Ok(()) +} diff --git a/svc/pkg/load-test/standalone/watch-requests/tests/integration.rs b/svc/pkg/load-test/standalone/watch-requests/tests/integration.rs new file mode 100644 index 0000000000..a654110bc9 --- /dev/null +++ b/svc/pkg/load-test/standalone/watch-requests/tests/integration.rs @@ -0,0 +1,15 @@ +use chirp_worker::prelude::*; + +use ::load_test_watch_requests::run_from_env; + +#[tokio::test(flavor = "multi_thread")] +async fn basic() { + tracing_subscriber::fmt() + .json() + .with_max_level(tracing::Level::INFO) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE) + .init(); + + // TODO: + run_from_env(util::timestamp::now()).await.unwrap(); +}