Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Infra** `infra-artifacts` Terraform plan & S3 bucket used for automating building & uploading internal binaries, etc.
- **Infra** Aiven Redis provider
- **Bolt** `bolt secret set <path> <value>` command
- `watch-requests` load test

### Changed

Expand Down
20 changes: 20 additions & 0 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions svc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ members = [
"pkg/load-test/standalone/api-cloud",
"pkg/load-test/standalone/mm",
"pkg/load-test/standalone/sqlx",
"pkg/load-test/standalone/watch-requests",
"pkg/mm-config/ops/lobby-group-get",
"pkg/mm-config/ops/lobby-group-resolve-name-id",
"pkg/mm-config/ops/lobby-group-resolve-version",
Expand Down
25 changes: 25 additions & 0 deletions svc/pkg/load-test/standalone/watch-requests/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "load-test-watch-requests"
version = "0.0.1"
edition = "2018"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
chirp-client = { path = "../../../../../lib/chirp/client" }
rivet-operation = { path = "../../../../../lib/operation/core" }
rivet-connection = { path = "../../../../../lib/connection" }
rivet-runtime = { path = "../../../../../lib/runtime" }
tokio = { version = "1.29", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] }
rivet-api = { path = "../../../../../sdks/rust" }
reqwest = "0.11"

faker-game = { path = "../../../faker/ops/game" }
faker-team = { path = "../../../faker/ops/team" }
user-identity-create = { path = "../../../user-identity/ops/create" }
token-create = { path = "../../../token/ops/create" }

[dev-dependencies]
chirp-worker = { path = "../../../../../lib/chirp/worker" }
9 changes: 9 additions & 0 deletions svc/pkg/load-test/standalone/watch-requests/Service.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[service]
name = "load-test-watch-requests"
load-test = true

[runtime]
kind = "rust"

[headless]

170 changes: 170 additions & 0 deletions svc/pkg/load-test/standalone/watch-requests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::collections::HashSet;

use futures_util::StreamExt;
use proto::{
backend::{self, pkg::*},
common,
};
use rivet_api::{apis::configuration::Configuration, models};
use rivet_operation::prelude::*;
use serde_json::json;
use tokio::time::{interval, Duration, Instant};

#[tracing::instrument(skip_all)]
pub async fn run_from_env(ts: i64) -> GlobalResult<()> {
let pools = rivet_pools::from_env("load-test-watch-requests").await?;
let client =
chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("load-test-watch-requests");
let cache = rivet_cache::CacheInner::from_env(pools.clone())?;
let ctx = OperationContext::new(
"load-test-watch-requests".into(),
std::time::Duration::from_secs(60),
rivet_connection::Connection::new(client, pools, cache),
Uuid::new_v4(),
Uuid::new_v4(),
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

// Create temp team
let (team_id, primary_user_id) = {
// Create team
let create_res = op!([ctx] faker_team {

..Default::default()
})
.await?;
let team_id = unwrap_ref!(create_res.team_id).as_uuid();
let primary_user_id = create_res.member_user_ids[0].as_uuid();

// Register user
op!([ctx] user_identity_create {
user_id: Some(primary_user_id.into()),
identity: Some(backend::user_identity::Identity {
kind: Some(backend::user_identity::identity::Kind::Email(backend::user_identity::identity::Email {
email: util::faker::email()
}))
})
})
.await
.unwrap();

(team_id, primary_user_id)
};

// Encode user token
let auth_token = {
let token_res = op!([ctx] token_create {
issuer: "test".into(),
token_config: Some(token::create::request::TokenConfig {
ttl: util::duration::hours(1),
}),
refresh_token_config: None,
client: Some(backend::net::ClientInfo {
user_agent: Some("Test".into()),
remote_address: Some("0.0.0.0".into()),
}),
kind: Some(token::create::request::Kind::New(token::create::request::KindNew {
entitlements: vec![
proto::claims::Entitlement {
kind: Some(
proto::claims::entitlement::Kind::User(proto::claims::entitlement::User {
user_id: Some(primary_user_id.into()),
})
)
},
],
})),
label: Some("usr".into()),
..Default::default()
})
.await
.unwrap();
let token = token_res.token.unwrap();

token.token
};
let bypass_token = {
let token_res = op!([ctx] token_create {
token_config: Some(token::create::request::TokenConfig {
ttl: util::duration::hours(1)
}),
refresh_token_config: None,
issuer: "api-status".to_owned(),
client: None,
kind: Some(token::create::request::Kind::New(token::create::request::KindNew {
entitlements: vec![
proto::claims::Entitlement {
kind: Some(
proto::claims::entitlement::Kind::Bypass(proto::claims::entitlement::Bypass { })
)
}
],
})),
label: Some("byp".to_owned()),
..Default::default()
})
.await?;
unwrap_ref!(token_res.token).token.clone()
};

let client = reqwest::Client::builder()
.default_headers({
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"x-bypass-token",
reqwest::header::HeaderValue::from_str(&bypass_token)?,
);
headers.insert(
"host",
reqwest::header::HeaderValue::from_str("api.nathan5.gameinc.io")?,
);
headers.insert(
"cf-connecting-ip",
reqwest::header::HeaderValue::from_str("192.0.2.0")?,
);
headers
})
.build()?;
let config = Configuration {
client,
base_path: "http://traefik.traefik.svc.cluster.local:80".into(),
bearer_access_token: Some(auth_token),
..Default::default()
};

let mut interval = tokio::time::interval(Duration::from_millis(50));
for i in 0..3_000 {
interval.tick().await;

if i % 100 == 0 {
tracing::info!(?i, "request");
}

let config = config.clone();
tokio::spawn(async move {
let mut watch_index = Option::<String>::None;
loop {
let start = Instant::now();
match rivet_api::apis::identity_events_api::identity_events_watch(
&config,
watch_index.as_ref().map(|x| x.as_str()),
)
.await
{
Ok(res) => {
if watch_index.is_none() {
tracing::info!(elapsed = ?start.elapsed().as_secs_f64(), "received initial response");
}
watch_index = Some(res.watch.index);
}
Err(err) => tracing::error!(?err, "error"),
}
}
});
}

Ok(())
}
15 changes: 15 additions & 0 deletions svc/pkg/load-test/standalone/watch-requests/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use rivet_operation::prelude::*;

fn main() -> GlobalResult<()> {
rivet_runtime::run(start()).unwrap()
}

async fn start() -> GlobalResult<()> {
load_test_watch_requests::run_from_env(util::timestamp::now()).await?;

tracing::info!("finished");

std::future::pending::<()>().await;

Ok(())
}
15 changes: 15 additions & 0 deletions svc/pkg/load-test/standalone/watch-requests/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use chirp_worker::prelude::*;

use ::load_test_watch_requests::run_from_env;

#[tokio::test(flavor = "multi_thread")]
async fn basic() {
tracing_subscriber::fmt()
.json()
.with_max_level(tracing::Level::INFO)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE)
.init();

// TODO:
run_from_env(util::timestamp::now()).await.unwrap();
}