diff --git a/.vscode/rivet.code-workspace b/.vscode/rivet.code-workspace index 445d8457b..34801add3 100644 --- a/.vscode/rivet.code-workspace +++ b/.vscode/rivet.code-workspace @@ -6,5 +6,8 @@ { "path": "../lib/bolt", }, + { + "path": "../svc", + } ], } diff --git a/infra/tf/modules/secrets/main.tf b/infra/tf/modules/secrets/main.tf index 412412700..b42ba22d0 100644 --- a/infra/tf/modules/secrets/main.tf +++ b/infra/tf/modules/secrets/main.tf @@ -2,7 +2,7 @@ terraform { required_providers { external = { source = "hashicorp/external" - version = "2.3.1" + version = "2.3.3" } } } diff --git a/proto/backend/dynamic_servers.proto b/proto/backend/dynamic_servers.proto index 47ca82c95..743515655 100644 --- a/proto/backend/dynamic_servers.proto +++ b/proto/backend/dynamic_servers.proto @@ -3,6 +3,8 @@ syntax = "proto3"; package rivet.backend.dynamic_servers; import "proto/common.proto"; +import "proto/backend/captcha.proto"; +import "proto/backend/region.proto"; message Server { rivet.common.Uuid server_id = 1; @@ -66,3 +68,240 @@ enum GameGuardProtocol { } message DockerHostRouting {} + + + + + + +// MARK: Game Config +message GameConfig { + bool host_networking_enabled = 1; + bool root_user_enabled = 2; +} + +// MARK: Game Namespace Config +message NamespaceConfig { + uint32 lobby_count_max = 1; + uint32 max_players_per_client = 2; + uint32 max_players_per_client_vpn = 3; + uint32 max_players_per_client_proxy = 4; + uint32 max_players_per_client_tor = 5; + uint32 max_players_per_client_hosting = 6; +} + +// MARK: Game Version Config +message VersionConfig { + repeated LobbyGroup lobby_groups = 1; + + optional rivet.backend.captcha.CaptchaConfig captcha = 2; +} + +message LobbyGroup { + message Region { + rivet.common.Uuid region_id = 1; + string tier_name_id = 2; + IdleLobbies idle_lobbies = 3; + } + + message IdleLobbies { + uint32 min_idle_lobbies = 1; + uint32 max_idle_lobbies = 2; + } + + message Actions { + optional FindConfig find = 1; + optional JoinConfig join = 2; + optional CreateConfig create = 3; + } + + string name_id = 1; + + repeated Region regions = 101; + uint32 max_players_normal = 102; + uint32 max_players_direct = 103; + uint32 max_players_party = 104; + bool listable = 105; + bool taggable = 106; + bool allow_dynamic_max_players = 107; + + LobbyRuntime runtime = 201; + + optional Actions actions = 301; +} + +message LobbyRuntime { + enum NetworkMode { + BRIDGE = 0; + HOST = 1; + } + + // Should be named "PortProtocol" + enum ProxyProtocol { + HTTP = 0; + HTTPS = 1; + TCP = 3; + TCP_TLS = 4; + UDP = 2; + } + + enum ProxyKind { + GAME_GUARD = 0; + NONE = 1; + } + + message PortRange { + uint32 min = 1; + uint32 max = 2; + } + + message Port { + string label = 1; + + // Only applicable to `ProxyProtocol::HTTP` and `ProxyProtocol::HTTP`. + optional uint32 target_port = 2; + + // Only applicable to `ProxyProtocol::UDP` and `ProxyProtocol::TCP` when `proxy_kind` is `ProxyKind::GameGuard`. + optional PortRange port_range = 4; + + ProxyProtocol proxy_protocol = 3; + + ProxyKind proxy_kind = 5; + } + + message EnvVar { + string key = 1; + string value = 2; + } + + message Docker { + rivet.common.Uuid build_id = 1; + repeated string args = 2; + repeated EnvVar env_vars = 4; + NetworkMode network_mode = 5; + repeated Port ports = 3; + } + + oneof runtime { + Docker docker = 201; + }; +} + +enum IdentityRequirement { + NONE = 0; + GUEST = 1; + REGISTERED = 2; +} + +message VerificationConfig { + string url = 1; + map headers = 2; +} + +message FindConfig { + bool enabled = 1; + IdentityRequirement identity_requirement = 2; + optional VerificationConfig verification = 3; +} + +message JoinConfig { + bool enabled = 1; + IdentityRequirement identity_requirement = 2; + optional VerificationConfig verification = 3; +} + +message CreateConfig { + bool enabled = 1; + IdentityRequirement identity_requirement = 2; + optional VerificationConfig verification = 3; + + bool enable_public = 4; + bool enable_private = 5; + + optional uint64 max_lobbies_per_identity = 6; +} + +// MARK: Game Version Config Context +// Context required to publish a new version. +message VersionConfigCtx { + repeated LobbyGroupCtx lobby_groups = 1; +} + +message LobbyGroupCtx { + LobbyRuntimeCtx runtime = 101; +} + +message LobbyRuntimeCtx { + message Docker { + optional rivet.common.Uuid job_template_id = 1 [deprecated = true]; + } + + oneof runtime { + Docker docker = 1; + }; +} + +// MARK: Game Version Config Meta +// Metadata about a given configuration generated after publishing. +message VersionConfigMeta { + repeated LobbyGroupMeta lobby_groups = 1; +} + +message LobbyGroupMeta { + // The indexes of `LobbyGroupMeta` and `LobbyGroupConfig` returned by `game-version-get` line up, so + // fetching lobby group config via `lobby_group_id` is done via zipping. + rivet.common.Uuid lobby_group_id = 1; + + LobbyRuntimeMeta runtime = 101; +} + +message LobbyRuntimeMeta { + message Docker { + optional rivet.common.Uuid job_template_id = 1 [deprecated = true]; + } + + oneof runtime { + Docker docker = 201; + }; +} + +// MARK: Lobby State +message Lobby { + enum Publicity { + PUBLIC = 0; + PRIVATE = 1; + } + + reserved 10; + + rivet.common.Uuid lobby_id = 1; + rivet.common.Uuid lobby_group_id = 2; + rivet.common.Uuid region_id = 3; + rivet.common.Uuid token_session_id = 4; + int64 create_ts = 5; + optional int64 ready_ts = 14; + optional int64 stop_ts = 13; + optional rivet.common.Uuid run_id = 6; + bool is_closed = 11; + rivet.common.Uuid namespace_id = 9; + optional rivet.common.Uuid create_ray_id = 12; + optional rivet.common.Uuid creator_user_id = 15; + bool is_custom = 16; + Publicity publicity = 17; + + uint32 max_players_normal = 101; + uint32 max_players_direct = 102; + uint32 max_players_party = 103; +} + +// MARK: Player State +message Player { + rivet.common.Uuid player_id = 1; + rivet.common.Uuid lobby_id = 2; + int64 create_ts = 3; + optional int64 register_ts = 4; + optional int64 remove_ts = 5; + rivet.common.Uuid token_session_id = 6; + rivet.common.Uuid create_ray_id = 7; +} + diff --git a/svc/Cargo.lock b/svc/Cargo.lock index 69344245d..5ed40add2 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -3086,13 +3086,34 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" name = "ds-server-create" version = "0.0.1" dependencies = [ + "bit-vec", + "build-get", "chirp-client", "chirp-worker", + "cjson", + "heck 0.3.3", + "hex", + "http 0.2.12", + "ip-info", "lazy_static", - "nomad-client", + "mm-lobby-list-for-user-id", + "nomad-util", "nomad_client", + "regex", + "region-get", "rivet-operation", + "rivet-util", + "rivet-util-build", + "s3-util", + "serde", + "serde_json", + "sha2", "sqlx", + "strum 0.24.1", + "tier-list", + "upload-get", + "user-identity-get", + "uuid", ] [[package]] diff --git a/svc/api/dynamic-servers/tests/basic.rs b/svc/api/dynamic-servers/tests/basic.rs index 9644e31d4..ed70f5ece 100644 --- a/svc/api/dynamic-servers/tests/basic.rs +++ b/svc/api/dynamic-servers/tests/basic.rs @@ -39,7 +39,6 @@ impl Ctx { util::timestamp::now(), util::timestamp::now(), (), - Vec::new(), ); Ctx { op_ctx } diff --git a/svc/pkg/ds/ops/server-create/Cargo.toml b/svc/pkg/ds/ops/server-create/Cargo.toml index f242969b1..87d15b132 100644 --- a/svc/pkg/ds/ops/server-create/Cargo.toml +++ b/svc/pkg/ds/ops/server-create/Cargo.toml @@ -7,11 +7,34 @@ license = "Apache-2.0" [dependencies] chirp-client = { path = "../../../../../lib/chirp/client" } +chirp-worker = { path = "../../../../../lib/chirp/worker" } rivet-operation = { path = "../../../../../lib/operation/core" } -nomad-client = "0.0.9" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" lazy_static = "1.4.0" +uuid = { version = "1", features = ["v4", "serde"] } +http = "0.2" +bit-vec = "0.6" +cjson = "0.1" +nomad-util = { path = "../../../../../lib/nomad-util" } +strum = { version = "0.24", features = ["derive"] } +sha2 = "0.10" +hex = "0.4" +rivet-util = { path = "../../../../../lib/util/core" } +heck = "0.3" +s3-util = { path = "../../../../../lib/s3-util" } +util-build = { package = "rivet-util-build", path = "../../../build/util" } +regex = "1.10" + +mm-lobby-list-for-user-id = { path = "../../../mm/ops/lobby-list-for-user-id" } +build-get = { path = "../../../build/ops/get" } +user-identity-get = { path = "../../../user-identity/ops/get" } +upload-get = { path = "../../../upload/ops/get" } +region-get = { path = "../../../region/ops/get" } +ip-info = { path = "../../../ip/ops/info" } +tier-list = { path = "../../../tier/ops/list" } -[dependencies.nomad_client_new] +[dependencies.nomad_client] package = "nomad_client" git = "https://github.com/rivet-gg/nomad-client" rev = "abb66bf0c30c7ff5b0c695dae952481c33e538b5" # pragma: allowlist secret diff --git a/svc/pkg/ds/ops/server-create/src/lib.rs b/svc/pkg/ds/ops/server-create/src/lib.rs index 323c3ddc5..810d694be 100644 --- a/svc/pkg/ds/ops/server-create/src/lib.rs +++ b/svc/pkg/ds/ops/server-create/src/lib.rs @@ -1,6 +1,33 @@ +// use chirp_worker::prelude::*; use futures_util::FutureExt; -use proto::backend::{self, pkg::*}; +use nomad_job::{ + escape_go_template, gen_oci_bundle_config, inject_consul_env_template, nomad_host_port_env_var, + template_env_var, template_env_var_int, DecodedPort, PortTarget, ProxyProtocol, + TransportProtocol, +}; +use proto::backend::{ + self, dynamic_servers::lobby_runtime::NetworkMode as LobbyRuntimeNetworkMode, +}; +use proto::{backend::pkg::*, chirp::response::Ok}; +use regex::Regex; use rivet_operation::prelude::*; +use serde_json::json; +use sha2::{Digest, Sha256}; +use std::{collections::HashMap, hash::DefaultHasher, net::IpAddr, time::Duration}; +use team::member_get::request; +use util_mm::key::lobby_config; +use nomad_client::models::*; + +mod nomad_job; +mod oci_config; +mod seccomp; +mod util_job; +mod util_mm; + +lazy_static::lazy_static! { + pub static ref NEW_NOMAD_CONFIG: nomad_client::apis::configuration::Configuration = + nomad_util::new_config_from_env().unwrap(); +} #[operation(name = "ds-server-create")] pub async fn handle( @@ -162,6 +189,932 @@ pub async fn handle( }) .await?; + // let ( + // (mm_game_config, namespace), + // mm_ns_config, + // (lobby_group, lobby_group_meta, version_id), + // region, + // tiers, + // ) = tokio::try_join!( + // fetch_namespace(ctx, namespace_id), + // fetch_mm_namespace_config(ctx, namespace_id), + // fetch_lobby_group_config(ctx, lobby_group_id), + // fetch_region(ctx, region_id), + // fetch_tiers(ctx, region_id), + // )?; + // let (mm_game_config, namespace) = fetch_namespace(ctx, namespace_id).await?; + // let mm_ns_config = fetch_mm_namespace_config(ctx, namespace_id).await?; + // let (lobby_group, lobby_group_meta, version_id) = fetch_lobby_group_config(ctx, lobby_group_id) + // .await?; + // let region = fetch_region(ctx, region_id).await?; + // let tiers = fetch_tiers(ctx, region_id).await?; + // let version = fetch_version(ctx, version_id).await?; + + // // Do all nomad stuff + // let namespace_id = unwrap_ref!(namespace.namespace_id).as_uuid(); + // let version_id = unwrap_ref!(version.version_id).as_uuid(); + // let lobby_group_id = unwrap_ref!(lobby_group_meta.lobby_group_id).as_uuid(); + // let region_id = unwrap_ref!(region.region_id).as_uuid(); + + // let job_runner_binary_url = resolve_job_runner_binary_url(ctx).await?; + + // let resolve_perf = ctx.perf().start("resolve-image-artifact-url").await; + // let build_id = unwrap_ref!(runtime.build_id).as_uuid(); + // let image_artifact_url = resolve_image_artifact_url(ctx, build_id, region).await?; + // resolve_perf.end(); + + // // Validate build exists and belongs to this game + // let build_id = unwrap_ref!(runtime.build_id).as_uuid(); + // let build_get = op!([ctx] build_get { + // build_ids: vec![build_id.into()], + // }) + // .await?; + // let build = unwrap!(build_get.builds.first()); + // let build_kind = unwrap!(backend::build::BuildKind::from_i32(build.kind)); + // let build_compression = unwrap!(backend::build::BuildCompression::from_i32( + // build.compression + // )); + + let ctx: OperationContext = ctx; + + + + let request_runtime = match unwrap!(ctx.runtime.clone()) { + dynamic_servers::server_create::request::Runtime::DockerRuntime(docker_runtime) => { + docker_runtime + } + }; + let request_runtime_network = unwrap!(request_runtime.network.clone()); + + + + // Generate the Docker job + + // let runtime = backend::dynamic_servers::lobby_runtime::Docker { + // build_id: todo!(), + // args: docker_runtime.args, + // env_vars: todo!(), + // network_mode: todo!(), + // ports: todo!(), + // }; + // let _image_tag = &build.image_tag; + // let tier = backend::region::Tier { + // tier_name_id: todo!(), + // rivet_cores_numerator: todo!(), + // rivet_cores_denominator: todo!(), + // cpu: todo!(), + // memory: todo!(), + // memory_max: todo!(), + // disk: todo!(), + // bandwidth: todo!(), + // }; + + // let lobby_config = ctx.lobby_config_json.is_some(); + // let lobby_tags = !ctx.tags.is_empty(); + // let build_kind = backend::build::BuildKind::DockerImage; + // let build_compression = backend::build::BuildCompression::None; + + // IMPORTANT: This job spec must be deterministic. Do not pass in parameters + // that change with every run, such as the lobby ID. Ensure the + // `reuse_job_id` test passes when changing this function. + use nomad_client::models::*; + + let resources = unwrap!(ctx.resources.clone()); + + let tier_res = op!([ctx] tier_list { + region_ids: vec![datacenter_id.into()], + }) + .await?; + let tier_region = unwrap!(tier_res.regions.first()); + + + // runc-compatible resources + let cpu = resources.cpu_millicores; // Millicore (1/1000 of a core) + let memory = resources.memory_mib * (1024 * 1024); // bytes + // let memory_max = tier.memory_max * (1024 * 1024); // bytes + + // Find the first tier that has more CPU and memory than the requested resources + let tier = unwrap!(tier_region.tiers.iter().find(|t| { + t.cpu as i32 >= cpu + && t.memory as i32 >= resources.memory_mib + })); + + // runc-compatible resources + let cpu = tier.rivet_cores_numerator as u64 * 1_000 / tier.rivet_cores_denominator as u64; // Millicore (1/1000 of a core) + let memory = tier.memory * (1024 * 1024); // bytes + let memory_max = tier.memory_max * (1024 * 1024); // bytes + + // // Nomad-compatible resources + // let resources = Resources { + // // TODO: Configure this per-provider + // // Nomad configures CPU based on MHz, not millicores. We havel to calculate the CPU share + // // by knowing how many MHz are on the client. + // CPU: if cpu < 1000 { + // Some((cpu - util_job::TASK_CLEANUP_CPU).try_into()?) + // } else { + // None + // }, + // cores: if cpu >= 1000 { + // Some((cpu / 1000) as i32) + // } else { + // None + // }, + // memory_mb: Some( + // (TryInto::::try_into(memory)? / (1024 * 1024) + // - util_job::TASK_CLEANUP_MEMORY as i64) + // .try_into()?, + // ), + // // Allow oversubscribing memory by 50% of the reserved + // // memory if using less than the node's total memory + // memory_max_mb: Some( + // (TryInto::::try_into((memory as f64 * 1.5) as i64)? / (1024 * 1024) + // - util_job::TASK_CLEANUP_MEMORY as i64) + // .try_into()?, + // ), + // ..Resources::new() + // }; + + // Nomad-compatible resources + let nomad_resources = Resources { + // TODO: Configure this per-provider + // Nomad configures CPU based on MHz, not millicores. We havel to calculate the CPU share + // by knowing how many MHz are on the client. + CPU: if tier.rivet_cores_numerator < tier.rivet_cores_denominator { + Some((tier.cpu - util_job::TASK_CLEANUP_CPU as u64).try_into()?) + } else { + None + }, + cores: if tier.rivet_cores_numerator >= tier.rivet_cores_denominator { + Some((tier.rivet_cores_numerator / tier.rivet_cores_denominator) as i32) + } else { + None + }, + memory_mb: Some( + (TryInto::::try_into(memory)? / (1024 * 1024) + - util_job::TASK_CLEANUP_MEMORY as i64) + .try_into()?, + ), + // Allow oversubscribing memory by 50% of the reserved + // memory if using less than the node's total memory + memory_max_mb: Some( + (TryInto::::try_into(memory_max)? / (1024 * 1024) + - util_job::TASK_CLEANUP_MEMORY as i64) + .try_into()?, + ), + disk_mb: Some(tier.disk as i32), // TODO: Is this deprecated? + ..Resources::new() + }; + + // // let network_mode = unwrap!(LobbyRuntimeNetworkMode::from_i32(runtime.network_mode)); + + // Read ports + let decoded_ports = request_runtime_network + .ports.clone() + .into_iter() + .map(|(port, docker_port)| match docker_port.routing { + Some(backend::dynamic_servers::docker_port::Routing::GameGuard(game_guard_routing)) => { + let target = PortTarget::Single(unwrap!(docker_port.port) as u16); + + GlobalResult::Ok(DecodedPort { + label: port.clone(), + nomad_port_label: util_mm::format_nomad_port_label(&port), + target, + proxy_protocol: unwrap!(backend::dynamic_servers::GameGuardProtocol::from_i32( + game_guard_routing.protocol + )) + .into(), + }) + } + Some(backend::dynamic_servers::docker_port::Routing::Host(_)) => { + todo!() + } + None => { + todo!() + } + }) + .collect::>>()?; + + // The container will set up port forwarding manually from the Nomad-defined ports on the host + // to the CNI container + let dynamic_ports = decoded_ports + .iter() + .filter_map(|port| { + port.target.get_nomad_port().map(|_| Port { + label: Some(port.nomad_port_label.clone()), + ..Port::new() + }) + }) + .collect::>(); + + // Port mappings to pass to the container. Only used in bridge networking. + let cni_port_mappings = decoded_ports.clone() + .into_iter() + .filter_map(|port| { + port.target.get_nomad_port().map(|target_port| { + json!({ + "HostPort": template_env_var_int(&nomad_host_port_env_var(&port.nomad_port_label)), + "ContainerPort": target_port, + "Protocol": TransportProtocol::from(port.proxy_protocol).as_cni_protocol(), + }) + }) + }) + .collect::>(); + + let prepared_ports = request_runtime_network.ports.iter().map(|(label, docker_port)| { + let mode = unwrap!(backend::dynamic_servers::DockerNetworkMode::from_i32(request_runtime_network.mode)); + let port_value = match mode { + // CNI will handle mapping the host port to the container port + backend::dynamic_servers::DockerNetworkMode::Bridge => unwrap!(docker_port.port).to_string(), + // The container needs to listen on the correct port + backend::dynamic_servers::DockerNetworkMode::Host => { + template_env_var(&nomad_host_port_env_var(&label)) + } + + }; + + GlobalResult::Ok(Some(String::new())) + // TODO + // Port with the kebab case port key. Included for backward compatabiilty & for + // less confusion. + // Ok((format!("PORT_{}", port.label.replace('-', "_")), port_value)) + }); + + // Also see util_mm:consts::DEFAULT_ENV_KEYS + let mut env = Vec::<(String, String)>::new().into_iter() + //runtime.env_vars + // .iter() + // .map(|v| (v.key.clone(), escape_go_template(&v.value))) + // TODO + // .chain(if lobby_config { + // Some(( + // "RIVET_LOBBY_CONFIG".to_string(), + // template_env_var("NOMAD_META_LOBBY_CONFIG"), + // )) + // } else { + // None + // }) + // .chain(if lobby_tags { + // Some(( + // "RIVET_LOBBY_TAGS".to_string(), + // template_env_var("NOMAD_META_LOBBY_TAGS"), + // )) + // } else { + // None + // }) + .chain([( + "RIVET_API_ENDPOINT".to_string(), + util::env::origin_api().to_string(), + )]) + + // Ports + // TODO + // .chain(prepared_ports) + // // Port ranges + // .chain( + // decoded_ports + // .iter() + // .filter_map(|port| { + // if let PortTarget::Range { min, max } = &port.target { + // let snake_port_label = port.label.replace('-', "_"); + + // Some([ + // ( + // format!("PORT_RANGE_MIN_{}", snake_port_label), + // min.to_string(), + // ), + // ( + // format!("PORT_RANGE_MAX_{}", snake_port_label), + // max.to_string(), + // ), + // ]) + // } else { + // None + // } + // }) + // .flatten(), + // ) + .map(|(k, v)| format!("{k}={v}")) + .collect::>(); + env.sort(); + + let services = decoded_ports + .iter() + .map(|port| { + if port.target.get_nomad_port().is_some() { + let service_name = format!("${{NOMAD_META_LOBBY_ID}}-{}", port.label); + GlobalResult::Ok(Some(Service { + provider: Some("nomad".into()), + name: Some(service_name), + tags: Some(vec!["game".into()]), + port_label: Some(port.nomad_port_label.clone()), + // checks: if TransportProtocol::from(port.proxy_protocol) + // == TransportProtocol::Tcp + // { + // Some(vec![ServiceCheck { + // name: Some(format!("{}-probe", port.label)), + // port_label: Some(port.nomad_port_label.clone()), + // _type: Some("tcp".into()), + // interval: Some(30_000_000_000), + // timeout: Some(2_000_000_000), + // ..ServiceCheck::new() + // }]) + // } else { + // None + // }, + ..Service::new() + })) + } else { + Ok(None) + } + }) + .filter_map(|x| x.transpose()) + .collect::>>()?; + + // Generate the command to download and decompress the file + let mut download_cmd = r#"curl -Lf "$NOMAD_META_IMAGE_ARTIFACT_URL""#.to_string(); + // TODO + // match build_compression { + // backend::build::BuildCompression::None => {} + // backend::build::BuildCompression::Lz4 => { + // download_cmd.push_str(" | lz4 -d -"); + // } + // } + + let job_spec = Job { + _type: Some("batch".into()), + constraints: Some(vec![Constraint { + l_target: Some("${node.class}".into()), + r_target: Some("job".into()), + operand: Some("=".into()), + }]), + parameterized_job: Some(Box::new(ParameterizedJobConfig { + payload: Some("forbidden".into()), + meta_required: Some(vec![ + "job_runner_binary_url".into(), + "vector_socket_addr".into(), + "image_artifact_url".into(), + "namespace_id".into(), + "namespace_name".into(), + "version_id".into(), + "version_name".into(), + "region_id".into(), + "region_name".into(), + "max_players_normal".into(), + "max_players_direct".into(), + "max_players_party".into(), + "root_user_enabled".into(), + ]), + meta_optional: Some(vec!["rivet_test_id".into()]), + })), + task_groups: Some(vec![TaskGroup { + name: Some(util_job::RUN_MAIN_TASK_NAME.into()), + constraints: None, // TODO: Use parameter meta to specify the hardware + affinities: None, // TODO: + // Allows for jobs to keep running and receiving players in the + // event of a disconnection from the Nomad server. + max_client_disconnect: Some(5 * 60 * 1_000_000_000), + restart_policy: Some(Box::new(RestartPolicy { + attempts: Some(0), + mode: Some("fail".into()), + ..RestartPolicy::new() + })), + reschedule_policy: Some(Box::new(ReschedulePolicy { + attempts: Some(0), + unlimited: Some(false), + ..ReschedulePolicy::new() + })), + networks: Some(vec![NetworkResource { + // The setup.sh script will set up a CNI network if using bridge networking + mode: Some("host".into()), + dynamic_ports: Some(dynamic_ports), + ..NetworkResource::new() + }]), + services: Some(services), + // Configure ephemeral disk for logs + ephemeral_disk: Some(Box::new(EphemeralDisk { + size_mb: Some(tier.disk as i32), + ..EphemeralDisk::new() + })), + tasks: Some(vec![ + Task { + name: Some("runc-setup".into()), + lifecycle: Some(Box::new(TaskLifecycle { + hook: Some("prestart".into()), + sidecar: Some(false), + })), + driver: Some("raw_exec".into()), + config: Some({ + let mut x = HashMap::new(); + x.insert("command".into(), json!("${NOMAD_TASK_DIR}/setup.sh")); + x + }), + templates: Some(vec![ + Template { + embedded_tmpl: Some(include_str!("./scripts/setup.sh").replace( + "__HOST_NETWORK__", + match unwrap!( + backend::dynamic_servers::DockerNetworkMode::from_i32( + request_runtime_network.mode + ) + ) { + backend::dynamic_servers::DockerNetworkMode::Bridge => "false", + backend::dynamic_servers::DockerNetworkMode::Host => "true", + }, + )), + dest_path: Some("${NOMAD_TASK_DIR}/setup.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some( + include_str!("./scripts/setup_job_runner.sh").into(), + ), + dest_path: Some("${NOMAD_TASK_DIR}/setup_job_runner.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some( + include_str!("./scripts/setup_oci_bundle.sh") + .replace("__DOWNLOAD_CMD__", &download_cmd) + .replace( + "__BUILD_KIND__", + // TODO + // match build_kind { + // backend::build::BuildKind::DockerImage => { + // "docker-image" + // } + // backend::build::BuildKind::OciBundle => "oci-bundle", + // }, + "docker-image" + ), + ), + dest_path: Some("${NOMAD_TASK_DIR}/setup_oci_bundle.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some( + include_str!("./scripts/setup_cni_network.sh").into(), + ), + dest_path: Some("${NOMAD_TASK_DIR}/setup_cni_network.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some(gen_oci_bundle_config( + cpu, memory, memory_max, env, + )?), + dest_path: Some( + "${NOMAD_ALLOC_DIR}/oci-bundle-config.base.json".into(), + ), + ..Template::new() + }, + Template { + embedded_tmpl: Some(inject_consul_env_template( + &serde_json::to_string(&cni_port_mappings)?, + )?), + dest_path: Some("${NOMAD_ALLOC_DIR}/cni-port-mappings.json".into()), + ..Template::new() + }, + ]), + resources: Some(Box::new(Resources { + CPU: Some(util_mm::RUNC_SETUP_CPU), + memory_mb: Some(util_mm::RUNC_SETUP_MEMORY), + ..Resources::new() + })), + log_config: None, + // TODO + // Some(Box::new(LogConfig { + // max_files: Some(4), + // max_file_size_mb: Some(2), + // })), + ..Task::new() + }, + Task { + name: Some(util_job::RUN_MAIN_TASK_NAME.into()), + driver: Some("raw_exec".into()), + config: Some({ + let mut x = HashMap::new(); + // This is downloaded in setup_job_runner.sh + x.insert("command".into(), json!("${NOMAD_ALLOC_DIR}/job-runner")); + x + }), + resources: Some(Box::new(nomad_resources.clone())), + // Intentionally high timeout. Killing jobs is handled manually with signals. + kill_timeout: Some(86400 * 1_000_000_000), + kill_signal: Some("SIGTERM".into()), + log_config: None, + // Some(Box::new(LogConfig { + // max_files: Some(4), + // max_file_size_mb: Some(4), + // })), + ..Task::new() + }, + // Task { + // name: Some("runc-cleanup".into()), + // lifecycle: Some(Box::new(TaskLifecycle { + // hook: Some("poststop".into()), + // sidecar: Some(false), + // })), + // driver: Some("raw_exec".into()), + // config: Some({ + // let mut x = HashMap::new(); + // x.insert("command".into(), json!("${NOMAD_TASK_DIR}/cleanup.sh")); + // x + // }), + // templates: Some(vec![Template { + // embedded_tmpl: Some(include_str!("./scripts/cleanup.sh").into()), + // dest_path: Some("${NOMAD_TASK_DIR}/cleanup.sh".into()), + // perms: Some("744".into()), + // ..Template::new() + // }]), + // resources: Some(Box::new(Resources { + // CPU: Some(util_mm::RUNC_CLEANUP_CPU), + // memory_mb: Some(util_mm::RUNC_CLEANUP_MEMORY), + // ..Resources::new() + // })), + // log_config: Some(Box::new(LogConfig { + // max_files: Some(4), + // max_file_size_mb: Some(2), + // })), + // ..Task::new() + // }, + ]), + ..TaskGroup::new() + }]), + ..Job::new() + }; + + let job_spec_json = serde_json::to_string(&job_spec)?; + + // // Build proxied ports for each exposed port + // let proxied_ports = runtime + // .ports + // .iter() + // .filter(|port| { + // port.proxy_kind == backend::dynamic_servers::lobby_runtime::ProxyKind::GameGuard as i32 + // && port.port_range.is_none() + // }) + // .flat_map(|port| { + // let mut ports = vec![direct_proxied_port(lobby_id, region_id, port)]; + // match backend::dynamic_servers::lobby_runtime::ProxyProtocol::from_i32( + // port.proxy_protocol, + // ) { + // Some( + // backend::dynamic_servers::lobby_runtime::ProxyProtocol::Http + // | backend::dynamic_servers::lobby_runtime::ProxyProtocol::Https, + // ) => { + // ports.push(path_proxied_port(lobby_id, region_id, port)); + // } + // Some( + // backend::dynamic_servers::lobby_runtime::ProxyProtocol::Udp + // | backend::dynamic_servers::lobby_runtime::ProxyProtocol::Tcp + // | backend::dynamic_servers::lobby_runtime::ProxyProtocol::TcpTls, + // ) + // | None => {} + // } + // ports + // }) + // .collect::>>()?; + + // submit_job(&job_spec_json, Some(region_id.into())); + + // Get the region to dispatch in + let region_res = op!([ctx] region_get { + region_ids: vec![datacenter_id.into()], + }) + .await?; + let region = unwrap!(region_res.regions.first()); + + let region = region; + let base_job: Job = serde_json::from_str::(&job_spec_json)?; + + // Modify the job spec + let mut job = { + let mut job = base_job; + // let region = region; + // Replace all job IDs with a placeholder value in order to create a + // deterministic job spec. + { + let job_id: &str = "__PLACEHOLDER__"; + let job: &mut nomad_client::models::Job = &mut job; + job.ID = Some(job_id.into()); + job.name = Some(job_id.into()); + }; + + ensure_eq!( + "batch", + unwrap_ref!(job._type).as_str(), + "only the batch job type is supported" + ); + + // Update the job's region + job.region = Some(region.nomad_region.clone()); + job.datacenters = Some(vec![region.nomad_datacenter.clone()]); + + // Validate that the job is parameterized + let parameters = unwrap!(job.parameterized_job.as_mut(), "job not parameterized"); + + // Add run parameters + parameters.meta_required = Some({ + let mut meta_required = parameters.meta_required.clone().unwrap_or_default(); + meta_required.push("job_run_id".into()); + meta_required + }); + + // Get task group + let task_groups = unwrap!(job.task_groups.as_mut()); + ensure_eq!(1, task_groups.len(), "must have exactly 1 task group"); + let task_group = unwrap!(task_groups.first_mut()); + ensure_eq!( + task_group.name.as_deref(), + Some(RUN_MAIN_TASK_NAME), + "must have main task group" + ); + + // Ensure has main task + let main_task = unwrap!( + task_group + .tasks + .iter_mut() + .flatten() + .find(|x| x.name.as_deref() == Some(RUN_MAIN_TASK_NAME)), + "must have main task" + ); + ensure!( + main_task + .lifecycle + .as_ref() + .map_or(true, |x| x.hook.is_none()), + "main task must not have a lifecycle hook" + ); + + // Configure networks + let networks = unwrap!(task_group.networks.as_mut()); + ensure_eq!(1, networks.len(), "must have exactly 1 network"); + let network = unwrap!(networks.first_mut()); + // Disable IPv6 DNS since Docker doesn't support IPv6 yet + network.DNS = Some(Box::new(nomad_client::models::DnsConfig { + servers: Some(vec![ + // Google + "8.8.8.8".into(), + "8.8.4.4".into(), + "2001:4860:4860::8888".into(), + "2001:4860:4860::8844".into(), + ]), + // Disable default search from the host + searches: Some(Vec::new()), + options: Some(vec!["rotate".into(), "edns0".into(), "attempts:2".into()]), + ..nomad_client::models::DnsConfig::new() + })); + + // Disable rescheduling, since job-run doesn't support this at the moment + task_group.reschedule_policy = + Some(Box::new(nomad_client::models::ReschedulePolicy { + attempts: Some(0), + unlimited: Some(false), + ..nomad_client::models::ReschedulePolicy::new() + })); + + // Disable restarts. Our Nomad monitoring workflow doesn't support restarts + // at the moment. + task_group.restart_policy = Some(Box::new(nomad_client::models::RestartPolicy { + attempts: Some(0), + // unlimited: Some(false), + ..nomad_client::models::RestartPolicy::new() + })); + + // Add cleanup task + let tasks: &mut Vec = unwrap!(task_group.tasks.as_mut()); + tasks.push({ + + + Task { + name: Some(RUN_CLEANUP_TASK_NAME.into()), + lifecycle: Some(Box::new(TaskLifecycle { + hook: Some("poststop".into()), + sidecar: Some(false), + })), + driver: Some("docker".into()), + config: Some({ + let mut config = HashMap::new(); + + config.insert("image".into(), json!("python:3.10.7-alpine3.16")); + config.insert( + "args".into(), + json!([ + "/bin/sh", + "-c", + "apk add --no-cache ca-certificates && python3 /local/cleanup.py" + ]), + ); + + config + }), + templates: Some(vec![Template { + dest_path: Some("local/cleanup.py".into()), + embedded_tmpl: Some(formatdoc!( + r#" + import ssl + import urllib.request, json, os, mimetypes, sys + + BEARER = '{{{{env "NOMAD_META_JOB_RUN_TOKEN"}}}}' + + ctx = ssl.create_default_context() + + def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + def req(method, url, data = None, headers = {{}}): + request = urllib.request.Request( + url=url, + data=data, + method=method, + headers=headers + ) + + try: + res = urllib.request.urlopen(request, context=ctx) + assert res.status == 200, f"Received non-200 status: {{res.status}}" + return res + except urllib.error.HTTPError as err: + eprint(f"HTTP Error ({{err.code}} {{err.reason}}):\n\nBODY:\n{{err.read().decode()}}\n\nHEADERS:\n{{err.headers}}") + + raise err + + print(f'\n> Cleaning up job') + + res_json = None + with req('POST', f'{origin_api}/job/runs/cleanup', + data = json.dumps({{}}).encode(), + headers = {{ + 'Authorization': f'Bearer {{BEARER}}', + 'Content-Type': 'application/json' + }} + ) as res: + res_json = json.load(res) + + + print('\n> Finished') + "#, + origin_api = util::env::origin_api(), + )), + ..Template::new() + }]), + resources: Some(Box::new(Resources { + CPU: Some(TASK_CLEANUP_CPU), + memory_mb: Some(TASK_CLEANUP_MEMORY), + ..Resources::new() + })), + log_config: Some(Box::new(LogConfig { + max_files: Some(4), + max_file_size_mb: Some(2), + disabled: Some(false), + })), + ..Task::new() + } +}); + + Ok::<_, rivet_operation::prelude::GlobalError>(job) + }?; + + // Derive jobspec hash + // + // We serialize the JSON to a canonical string then take a SHA hash of the output. + let job_cjson_str = match cjson::to_string(&job) { + Ok(x) => x, + Err(err) => { + tracing::error!(?err, "cjson serialization failed"); + bail!("cjson serialization failed") + } + }; + let job_hash = Sha256::digest(job_cjson_str.as_bytes()); + let job_hash_str = hex::encode(job_hash); + + // Generate new job ID + let job_id = format!( + "job-{hash}:{region}", + hash = &job_hash_str[0..12], + region = region.name_id + ); + { + let job_id: &str = &job_id; + let job: &mut nomad_client::models::Job = &mut job; + job.ID = Some(job_id.into()); + job.name = Some(job_id.into()); + }; + + + // Submit the job + tracing::info!("submitting job"); + + nomad_client::apis::jobs_api::post_job( + &NEW_NOMAD_CONFIG, + &job_id, + nomad_client::models::JobRegisterRequest { + job: Some(Box::new(job)), + ..nomad_client::models::JobRegisterRequest::new() + }, + Some(®ion.nomad_region), + None, + None, + None, + ) + .await?; + + // Ok(job_id); + + // msg!([ctx] job_run::msg::create(run_id) { + // run_id: Some(run_id.into()), + // region_id: Some(region_id.into()), + // parameters: vec![ + // backend::job::Parameter { + // key: "job_runner_binary_url".into(), + // value: job_runner_binary_url, + // }, + // backend::job::Parameter { + // key: "vector_socket_addr".into(), + // value: "127.0.0.1:5021".to_string(), + // }, + // backend::job::Parameter { + // key: "image_artifact_url".into(), + // value: image_artifact_url.to_string(), + // }, + // backend::job::Parameter { + // key: "namespace_id".into(), + // value: namespace_id.to_string(), + // }, + // backend::job::Parameter { + // key: "namespace_name".into(), + // value: namespace.name_id.to_owned(), + // }, + // backend::job::Parameter { + // key: "version_id".into(), + // value: version_id.to_string(), + // }, + // backend::job::Parameter { + // key: "version_name".into(), + // value: version.display_name.to_owned(), + // }, + // backend::job::Parameter { + // key: "lobby_group_id".into(), + // value: lobby_group_id.to_string(), + // }, + // backend::job::Parameter { + // key: "lobby_group_name".into(), + // value: lobby_group.name_id.clone(), + // }, + // backend::job::Parameter { + // key: "lobby_id".into(), + // value: lobby_id.to_string(), + // }, + // backend::job::Parameter { + // key: "lobby_token".into(), + // value: lobby_token.to_owned(), + // }, + // backend::job::Parameter { + // key: "lobby_config".into(), + // value: ctx.lobby_config_json.clone().unwrap_or_default(), + // }, + // backend::job::Parameter { + // key: "lobby_tags".into(), + // value: serde_json::to_string(&ctx.tags)?, + // }, + // backend::job::Parameter { + // key: "region_id".into(), + // value: region_id.to_string(), + // }, + // backend::job::Parameter { + // key: "region_name".into(), + // value: region.name_id.to_string(), + // }, + // backend::job::Parameter { + // key: "max_players_normal".into(), + // value: max_players_normal.to_string(), + // }, + // backend::job::Parameter { + // key: "max_players_direct".into(), + // value: max_players_direct.to_string(), + // }, + // backend::job::Parameter { + // key: "max_players_party".into(), + // value: lobby_group.max_players_party.to_string(), + // }, + // backend::job::Parameter { + // key: "root_user_enabled".into(), + // value: if mm_game_config.root_user_enabled { "1" } else { "0" }.into() + // }, + // ].into_iter() + // .chain(ctx.parameters.clone()) + // .collect(), + + // job_spec_json: job_spec_json, + // proxied_ports: proxied_ports, + // ..Default::default() + // }) + // .await?; + Ok(dynamic_servers::server_create::Response { server: Some(backend::dynamic_servers::Server { server_id: Some(server_id.into()), @@ -191,3 +1144,588 @@ pub async fn handle( }), }) } + +/// Determines if a Nomad job is dispatched from our run. +/// +/// We use this when monitoring Nomad in order to determine which events to +/// pay attention to. +pub fn is_nomad_job_run(job_id: &str) -> bool { + job_id.starts_with("job-") && job_id.contains("/dispatch-") +} + +// Timeout from when `stop_job` is called and the kill signal is sent +pub const JOB_STOP_TIMEOUT: Duration = Duration::from_secs(30); + +pub const TASK_CLEANUP_CPU: i32 = 50; + +// Query Prometheus with: +// +// ``` +// max(nomad_client_allocs_memory_max_usage{ns="prod",exported_job=~"job-.*",task="run-cleanup"}) / 1000 / 1000 +// ``` +// +// 13.5 MB baseline, 29 MB highest peak +pub const TASK_CLEANUP_MEMORY: i32 = 32; + +pub const RUN_MAIN_TASK_NAME: &str = "main"; +pub const RUN_CLEANUP_TASK_NAME: &str = "run-cleanup"; + +// dispatch, need alloc, nomad monitor stuff, lots of stuff here, means that +// jobs can't be destroyed, maybe by job id? + +// #[tracing::instrument] +// async fn create_docker_job( +// ctx: &OperationContext, +// runtime: &backend::dynamic_servers::lobby_runtime::Docker, +// runtime_meta: &backend::dynamic_servers::lobby_runtime_meta::Docker, +// namespace: &backend::game::Namespace, +// version: &backend::game::Version, +// mm_game_config: &backend::dynamic_servers::GameConfig, +// lobby_group: &backend::dynamic_servers::LobbyGroup, +// lobby_group_meta: &backend::dynamic_servers::LobbyGroupMeta, +// region: &backend::region::Region, +// tier: &backend::region::Tier, +// max_players_normal: u32, +// max_players_direct: u32, +// run_id: Uuid, +// lobby_id: Uuid, +// lobby_token: &str, +// ) -> GlobalResult<()> { +// let namespace_id = unwrap_ref!(namespace.namespace_id).as_uuid(); +// let version_id = unwrap_ref!(version.version_id).as_uuid(); +// let lobby_group_id = unwrap_ref!(lobby_group_meta.lobby_group_id).as_uuid(); +// let region_id = unwrap_ref!(region.region_id).as_uuid(); + +// let job_runner_binary_url = resolve_job_runner_binary_url(ctx).await?; + +// let resolve_perf = ctx.perf().start("resolve-image-artifact-url").await; +// let build_id = unwrap_ref!(runtime.build_id).as_uuid(); +// let image_artifact_url = resolve_image_artifact_url(ctx, build_id, region).await?; +// resolve_perf.end(); + +// // Validate build exists and belongs to this game +// let build_id = unwrap_ref!(runtime.build_id).as_uuid(); +// let build_get = op!([ctx] build_get { +// build_ids: vec![build_id.into()], +// }) +// .await?; +// let build = unwrap!(build_get.builds.first()); +// let build_kind = unwrap!(backend::build::BuildKind::from_i32(build.kind)); +// let build_compression = unwrap!(backend::build::BuildCompression::from_i32( +// build.compression +// )); + +// // Generate the Docker job +// let job_spec = nomad_job::gen_lobby_docker_job( +// runtime, +// &build.image_tag, +// tier, +// ctx.lobby_config_json.is_some(), +// !ctx.tags.is_empty(), +// build_kind, +// build_compression, +// )?; +// let job_spec_json = serde_json::to_string(&job_spec)?; + +// // Build proxied ports for each exposed port +// let proxied_ports = runtime +// .ports +// .iter() +// .filter(|port| { +// port.proxy_kind == backend::dynamic_servers::lobby_runtime::ProxyKind::GameGuard as i32 +// && port.port_range.is_none() +// }) +// .flat_map(|port| { +// let mut ports = vec![direct_proxied_port(lobby_id, region_id, port)]; +// match backend::dynamic_servers::lobby_runtime::ProxyProtocol::from_i32( +// port.proxy_protocol, +// ) { +// Some( +// backend::dynamic_servers::lobby_runtime::ProxyProtocol::Http +// | backend::dynamic_servers::lobby_runtime::ProxyProtocol::Https, +// ) => { +// ports.push(path_proxied_port(lobby_id, region_id, port)); +// } +// Some( +// backend::dynamic_servers::lobby_runtime::ProxyProtocol::Udp +// | backend::dynamic_servers::lobby_runtime::ProxyProtocol::Tcp +// | backend::dynamic_servers::lobby_runtime::ProxyProtocol::TcpTls, +// ) +// | None => {} +// } +// ports +// }) +// .collect::>>()?; + +// submit_job(&job_spec_json, Some(region_id.into())); + +// // msg!([ctx] job_run::msg::create(run_id) { +// // run_id: Some(run_id.into()), +// // region_id: Some(region_id.into()), +// // parameters: vec![ +// // backend::job::Parameter { +// // key: "job_runner_binary_url".into(), +// // value: job_runner_binary_url, +// // }, +// // backend::job::Parameter { +// // key: "vector_socket_addr".into(), +// // value: "127.0.0.1:5021".to_string(), +// // }, +// // backend::job::Parameter { +// // key: "image_artifact_url".into(), +// // value: image_artifact_url.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "namespace_id".into(), +// // value: namespace_id.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "namespace_name".into(), +// // value: namespace.name_id.to_owned(), +// // }, +// // backend::job::Parameter { +// // key: "version_id".into(), +// // value: version_id.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "version_name".into(), +// // value: version.display_name.to_owned(), +// // }, +// // backend::job::Parameter { +// // key: "lobby_group_id".into(), +// // value: lobby_group_id.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "lobby_group_name".into(), +// // value: lobby_group.name_id.clone(), +// // }, +// // backend::job::Parameter { +// // key: "lobby_id".into(), +// // value: lobby_id.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "lobby_token".into(), +// // value: lobby_token.to_owned(), +// // }, +// // backend::job::Parameter { +// // key: "lobby_config".into(), +// // value: ctx.lobby_config_json.clone().unwrap_or_default(), +// // }, +// // backend::job::Parameter { +// // key: "lobby_tags".into(), +// // value: serde_json::to_string(&ctx.tags)?, +// // }, +// // backend::job::Parameter { +// // key: "region_id".into(), +// // value: region_id.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "region_name".into(), +// // value: region.name_id.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "max_players_normal".into(), +// // value: max_players_normal.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "max_players_direct".into(), +// // value: max_players_direct.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "max_players_party".into(), +// // value: lobby_group.max_players_party.to_string(), +// // }, +// // backend::job::Parameter { +// // key: "root_user_enabled".into(), +// // value: if mm_game_config.root_user_enabled { "1" } else { "0" }.into() +// // }, +// // ].into_iter() +// // .chain(ctx.parameters.clone()) +// // .collect(), + +// // job_spec_json: job_spec_json, +// // proxied_ports: proxied_ports, +// // ..Default::default() +// // }) +// // .await?; + +// Ok(()) +// } + +// #[tracing::instrument] +// async fn resolve_image_artifact_url( +// ctx: &OperationContext, +// build_id: Uuid, +// region: &backend::region::Region, +// ) -> GlobalResult { +// let build_res = op!([ctx] build_get { +// build_ids: vec![build_id.into()], +// }) +// .await?; +// let build = build_res.builds.first(); +// let build = unwrap_ref!(build); +// let build_kind = unwrap!(backend::build::BuildKind::from_i32(build.kind)); +// let build_compression = unwrap!(backend::build::BuildCompression::from_i32( +// build.compression +// )); +// let upload_id_proto = unwrap!(build.upload_id); + +// let upload_res = op!([ctx] upload_get { +// upload_ids: vec![upload_id_proto], +// }) +// .await?; +// let upload = unwrap!(upload_res.uploads.first()); + +// // Get provider +// let proto_provider = unwrap!( +// backend::upload::Provider::from_i32(upload.provider), +// "invalid upload provider" +// ); +// let provider = match proto_provider { +// backend::upload::Provider::Minio => s3_util::Provider::Minio, +// backend::upload::Provider::Backblaze => s3_util::Provider::Backblaze, +// backend::upload::Provider::Aws => s3_util::Provider::Aws, +// }; + +// let file_name = util_build::file_name(build_kind, build_compression); + +// let mm_lobby_delivery_method = unwrap!( +// backend::cluster::BuildDeliveryMethod::from_i32(region.build_delivery_method), +// "invalid datacenter build delivery method" +// ); +// match mm_lobby_delivery_method { +// backend::cluster::BuildDeliveryMethod::S3Direct => { +// tracing::info!("using s3 direct delivery"); + +// let bucket = "bucket-build"; + +// // Build client +// let s3_client = +// s3_util::Client::from_env_opt(bucket, provider, s3_util::EndpointKind::External) +// .await?; + +// let upload_id = unwrap_ref!(upload.upload_id).as_uuid(); +// let presigned_req = s3_client +// .get_object() +// .bucket(s3_client.bucket()) +// .key(format!("{upload_id}/{file_name}")) +// .presigned( +// s3_util::aws_sdk_s3::presigning::config::PresigningConfig::builder() +// .expires_in(std::time::Duration::from_secs(15 * 60)) +// .build()?, +// ) +// .await?; + +// let addr = presigned_req.uri().clone(); + +// let addr_str = addr.to_string(); +// tracing::info!(addr = %addr_str, "resolved artifact s3 presigned request"); + +// Ok(addr_str) +// } +// backend::cluster::BuildDeliveryMethod::TrafficServer => { +// tracing::info!("using traffic server delivery"); + +// let region_id = unwrap_ref!(region.region_id).as_uuid(); + +// // Hash build so that the ATS server that we download the build from is always the same one. This +// // improves cache hit rates and reduces download times. +// let build_id = unwrap_ref!(build.build_id).as_uuid(); +// let mut hasher = DefaultHasher::new(); +// hasher.write(build_id.as_bytes()); +// let hash = hasher.finish() as i64; + +// // NOTE: The algorithm for choosing the vlan_ip from the hash should match the one in +// // prewarm_ats.rs @ prewarm_ats_cache +// // Get vlan ip from build id hash for consistent routing +// let (ats_vlan_ip,) = sql_fetch_one!( +// [ctx, (IpAddr,)] +// " +// WITH sel AS ( +// -- Select candidate vlan ips +// SELECT +// vlan_ip +// FROM db_cluster.servers +// WHERE +// datacenter_id = $1 AND +// pool_type = $2 AND +// vlan_ip IS NOT NULL AND +// install_complete_ts IS NOT NULL AND +// drain_ts IS NULL AND +// cloud_destroy_ts IS NULL +// ) +// SELECT vlan_ip +// FROM sel +// -- Use mod to make sure the hash stays within bounds +// OFFSET abs($3 % (SELECT COUNT(*) from sel)) +// LIMIT 1 +// ", +// // NOTE: region_id is just the old name for datacenter_id +// ®ion_id, +// backend::cluster::PoolType::Ats as i64, +// hash, +// ) +// .await?; + +// let upload_id = unwrap_ref!(upload.upload_id).as_uuid(); +// let addr = format!( +// "http://{vlan_ip}:8080/s3-cache/{provider}/{namespace}-bucket-build/{upload_id}/{file_name}", +// vlan_ip = ats_vlan_ip, +// provider = heck::KebabCase::to_kebab_case(provider.as_str()), +// namespace = util::env::namespace(), +// upload_id = upload_id, +// ); + +// tracing::info!(%addr, "resolved artifact s3 url"); + +// Ok(addr) +// } +// } +// } + +async fn submit_job(base_job_json: &str, region: &backend::region::Region) -> GlobalResult { + let (job_id, job) = { + // let region = region; + let base_job = serde_json::from_str::(base_job_json)?; + + // Modify the job spec + let mut job = { + let mut job = base_job; + // let region = region; + // Replace all job IDs with a placeholder value in order to create a + // deterministic job spec. + { + let job_id: &str = "__PLACEHOLDER__"; + let job: &mut nomad_client::models::Job = &mut job; + job.ID = Some(job_id.into()); + job.name = Some(job_id.into()); + }; + + ensure_eq!( + "batch", + unwrap_ref!(job._type).as_str(), + "only the batch job type is supported" + ); + + // Update the job's region + job.region = Some(region.nomad_region.clone()); + job.datacenters = Some(vec![region.nomad_datacenter.clone()]); + + // Validate that the job is parameterized + let parameters = unwrap!(job.parameterized_job.as_mut(), "job not parameterized"); + + // Add run parameters + parameters.meta_required = Some({ + let mut meta_required = parameters.meta_required.clone().unwrap_or_default(); + meta_required.push("job_run_id".into()); + meta_required + }); + + // Get task group + let task_groups = unwrap!(job.task_groups.as_mut()); + ensure_eq!(1, task_groups.len(), "must have exactly 1 task group"); + let task_group = unwrap!(task_groups.first_mut()); + ensure_eq!( + task_group.name.as_deref(), + Some(RUN_MAIN_TASK_NAME), + "must have main task group" + ); + + // Ensure has main task + let main_task = unwrap!( + task_group + .tasks + .iter_mut() + .flatten() + .find(|x| x.name.as_deref() == Some(RUN_MAIN_TASK_NAME)), + "must have main task" + ); + ensure!( + main_task + .lifecycle + .as_ref() + .map_or(true, |x| x.hook.is_none()), + "main task must not have a lifecycle hook" + ); + + // Configure networks + let networks = unwrap!(task_group.networks.as_mut()); + ensure_eq!(1, networks.len(), "must have exactly 1 network"); + let network = unwrap!(networks.first_mut()); + // Disable IPv6 DNS since Docker doesn't support IPv6 yet + network.DNS = Some(Box::new(nomad_client::models::DnsConfig { + servers: Some(vec![ + // Google + "8.8.8.8".into(), + "8.8.4.4".into(), + "2001:4860:4860::8888".into(), + "2001:4860:4860::8844".into(), + ]), + // Disable default search from the host + searches: Some(Vec::new()), + options: Some(vec!["rotate".into(), "edns0".into(), "attempts:2".into()]), + ..nomad_client::models::DnsConfig::new() + })); + + // Disable rescheduling, since job-run doesn't support this at the moment + task_group.reschedule_policy = + Some(Box::new(nomad_client::models::ReschedulePolicy { + attempts: Some(0), + unlimited: Some(false), + ..nomad_client::models::ReschedulePolicy::new() + })); + + // Disable restarts. Our Nomad monitoring workflow doesn't support restarts + // at the moment. + task_group.restart_policy = Some(Box::new(nomad_client::models::RestartPolicy { + attempts: Some(0), + // unlimited: Some(false), + ..nomad_client::models::RestartPolicy::new() + })); + + // Add cleanup task + let tasks = unwrap!(task_group.tasks.as_mut()); + tasks.push({ + use nomad_client::models::*; + + Task { + name: Some(RUN_CLEANUP_TASK_NAME.into()), + lifecycle: Some(Box::new(TaskLifecycle { + hook: Some("poststop".into()), + sidecar: Some(false), + })), + driver: Some("docker".into()), + config: Some({ + let mut config = HashMap::new(); + + config.insert("image".into(), json!("python:3.10.7-alpine3.16")); + config.insert( + "args".into(), + json!([ + "/bin/sh", + "-c", + "apk add --no-cache ca-certificates && python3 /local/cleanup.py" + ]), + ); + + config + }), + templates: Some(vec![Template { + dest_path: Some("local/cleanup.py".into()), + embedded_tmpl: Some(formatdoc!( + r#" + import ssl + import urllib.request, json, os, mimetypes, sys + + BEARER = '{{{{env "NOMAD_META_JOB_RUN_TOKEN"}}}}' + + ctx = ssl.create_default_context() + + def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + def req(method, url, data = None, headers = {{}}): + request = urllib.request.Request( + url=url, + data=data, + method=method, + headers=headers + ) + + try: + res = urllib.request.urlopen(request, context=ctx) + assert res.status == 200, f"Received non-200 status: {{res.status}}" + return res + except urllib.error.HTTPError as err: + eprint(f"HTTP Error ({{err.code}} {{err.reason}}):\n\nBODY:\n{{err.read().decode()}}\n\nHEADERS:\n{{err.headers}}") + + raise err + + print(f'\n> Cleaning up job') + + res_json = None + with req('POST', f'{origin_api}/job/runs/cleanup', + data = json.dumps({{}}).encode(), + headers = {{ + 'Authorization': f'Bearer {{BEARER}}', + 'Content-Type': 'application/json' + }} + ) as res: + res_json = json.load(res) + + + print('\n> Finished') + "#, + origin_api = util::env::origin_api(), + )), + ..Template::new() + }]), + resources: Some(Box::new(Resources { + CPU: Some(TASK_CLEANUP_CPU), + memory_mb: Some(TASK_CLEANUP_MEMORY), + ..Resources::new() + })), + log_config: Some(Box::new(LogConfig { + max_files: Some(4), + max_file_size_mb: Some(2), + disabled: Some(false), + })), + ..Task::new() + } +}); + + Ok::<_, rivet_operation::prelude::GlobalError>(job) + }?; + + // Derive jobspec hash + // + // We serialize the JSON to a canonical string then take a SHA hash of the output. + let job_cjson_str = match cjson::to_string(&job) { + Ok(x) => x, + Err(err) => { + tracing::error!(?err, "cjson serialization failed"); + bail!("cjson serialization failed") + } + }; + let job_hash = Sha256::digest(job_cjson_str.as_bytes()); + let job_hash_str = hex::encode(job_hash); + + // Generate new job ID + let job_id = format!( + "job-{hash}:{region}", + hash = &job_hash_str[0..12], + region = region.name_id + ); + { + let job_id: &str = &job_id; + let job: &mut nomad_client::models::Job = &mut job; + job.ID = Some(job_id.into()); + job.name = Some(job_id.into()); + }; + + Ok::<_, rivet_operation::prelude::GlobalError>((job_id, job)) + }?; + + // Submit the job + { + tracing::info!("submitting job"); + + nomad_client::apis::jobs_api::post_job( + &NEW_NOMAD_CONFIG, + &job_id, + nomad_client::models::JobRegisterRequest { + job: Some(Box::new(job)), + ..nomad_client::models::JobRegisterRequest::new() + }, + Some(®ion.nomad_region), + None, + None, + None, + ) + .await?; + + // Ok(()) + } + + Ok(job_id) +} diff --git a/svc/pkg/ds/ops/server-create/src/nomad_job.rs b/svc/pkg/ds/ops/server-create/src/nomad_job.rs new file mode 100644 index 000000000..a0d2f571c --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/nomad_job.rs @@ -0,0 +1,682 @@ +use std::{collections::HashMap, convert::TryInto}; + +use chirp_worker::prelude::*; +use proto::backend::{self, matchmaker::lobby_runtime::NetworkMode as LobbyRuntimeNetworkMode}; +use regex::Regex; +use serde_json::json; + +use crate::{oci_config, util_job}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TransportProtocol { + Tcp, + Udp, +} + +impl From for TransportProtocol { + fn from(proxy_protocol: ProxyProtocol) -> Self { + match proxy_protocol { + ProxyProtocol::Http + | ProxyProtocol::Https + | ProxyProtocol::Tcp + | ProxyProtocol::TcpTls => Self::Tcp, + ProxyProtocol::Udp => Self::Udp, + } + } +} + +impl TransportProtocol { + pub fn as_cni_protocol(&self) -> &'static str { + match self { + Self::Tcp => "tcp", + Self::Udp => "udp", + } + } +} + +/// What a port is being pointed at. +#[derive(Clone)] +pub enum PortTarget { + Single(u16), + Range { min: u16, max: u16 }, +} + +impl PortTarget { + /// Returns the port to be passed to Nomad's `dynamic_ports` config. + /// + /// This will return `None` if a port range is provided where `min` and + /// `max` are not the same. + pub fn get_nomad_port(&self) -> Option { + match self { + PortTarget::Single(x) => Some(*x), + PortTarget::Range { min, max } => { + if min == max { + Some(*min) + } else { + None + } + } + } + } +} + +#[derive(Clone)] +pub enum ProxyProtocol { + Http, + Https, + Tcp, + TcpTls, + Udp, +} + +impl From for ProxyProtocol { + fn from(protocol: backend::dynamic_servers::GameGuardProtocol) -> Self { + match protocol { + backend::dynamic_servers::GameGuardProtocol::Http => Self::Http, + backend::dynamic_servers::GameGuardProtocol::Https => Self::Https, + backend::dynamic_servers::GameGuardProtocol::Tcp => Self::Tcp, + backend::dynamic_servers::GameGuardProtocol::TcpTls => Self::TcpTls, + backend::dynamic_servers::GameGuardProtocol::Udp => Self::Udp, + } + } +} + +/// Helper structure for parsing all of the runtime's ports before building the +/// config. +#[derive(Clone)] +pub struct DecodedPort { + pub label: String, + pub nomad_port_label: String, + pub target: PortTarget, + pub proxy_protocol: ProxyProtocol, +} + +pub fn gen_lobby_docker_job( + runtime: &backend::matchmaker::lobby_runtime::Docker, + _image_tag: &str, + tier: &backend::region::Tier, + lobby_config: bool, + lobby_tags: bool, + build_kind: backend::build::BuildKind, + build_compression: backend::build::BuildCompression, +) -> GlobalResult { + // IMPORTANT: This job spec must be deterministic. Do not pass in parameters + // that change with every run, such as the lobby ID. Ensure the + // `reuse_job_id` test passes when changing this function. + use nomad_client::models::*; + + // runc-compatible resources + let cpu = tier.rivet_cores_numerator as u64 * 1_000 / tier.rivet_cores_denominator as u64; // Millicore (1/1000 of a core) + let memory = tier.memory * (1024 * 1024); // bytes + let memory_max = tier.memory_max * (1024 * 1024); // bytes + + // Nomad-compatible resources + let resources = Resources { + // TODO: Configure this per-provider + // Nomad configures CPU based on MHz, not millicores. We havel to calculate the CPU share + // by knowing how many MHz are on the client. + CPU: if tier.rivet_cores_numerator < tier.rivet_cores_denominator { + Some((tier.cpu - util_job::TASK_CLEANUP_CPU as u64).try_into()?) + } else { + None + }, + cores: if tier.rivet_cores_numerator >= tier.rivet_cores_denominator { + Some((tier.rivet_cores_numerator / tier.rivet_cores_denominator) as i32) + } else { + None + }, + memory_mb: Some( + (TryInto::::try_into(memory)? / (1024 * 1024) + - util_job::TASK_CLEANUP_MEMORY as i64) + .try_into()?, + ), + // Allow oversubscribing memory by 50% of the reserved + // memory if using less than the node's total memory + memory_max_mb: Some( + (TryInto::::try_into(memory_max)? / (1024 * 1024) + - util_job::TASK_CLEANUP_MEMORY as i64) + .try_into()?, + ), + disk_mb: Some(tier.disk as i32), // TODO: Is this deprecated? + ..Resources::new() + }; + + let network_mode = unwrap!(LobbyRuntimeNetworkMode::from_i32(runtime.network_mode)); + + // Read ports + let decoded_ports = runtime + .ports + .iter() + .map(|port| { + let target = if let Some(target_port) = port.target_port { + PortTarget::Single(target_port as u16) + } else if let Some(port_range) = &port.port_range { + PortTarget::Range { + min: port_range.min as u16, + max: port_range.max as u16, + } + } else { + bail!("must have either target_port or port_range"); + }; + // TODO + // GlobalResult::Ok(DecodedPort { + // label: port.label.clone(), + // nomad_port_label: util_mm::format_nomad_port_label(&port.label), + // target, + // proxy_protocol: unwrap!(ProxyProtocol::from_i32(port.proxy_protocol)), + // }) + GlobalResult::Ok(DecodedPort { + label: port.label.clone(), + nomad_port_label: String::new(), + target, + proxy_protocol: ProxyProtocol::Http, + }) + }) + .collect::>>()?; + + // The container will set up port forwarding manually from the Nomad-defined ports on the host + // to the CNI container + let dynamic_ports = decoded_ports + .iter() + .filter_map(|port| { + port.target.get_nomad_port().map(|_| Port { + label: Some(port.nomad_port_label.clone()), + ..Port::new() + }) + }) + .collect::>(); + + // Port mappings to pass to the container. Only used in bridge networking. + let cni_port_mappings = decoded_ports + .iter() + .filter_map(|port| { + port.target.get_nomad_port().map(|target_port| { + json!({ + "HostPort": template_env_var_int(&nomad_host_port_env_var(&port.nomad_port_label)), + "ContainerPort": target_port, + // TODO + // "Protocol": TransportProtocol::from(port.proxy_protocol).as_cni_protocol(), + "Protocol": TransportProtocol::Udp.as_cni_protocol(), + }) + }) + }) + .collect::>(); + + // Also see util_mm:consts::DEFAULT_ENV_KEYS + let mut env = runtime + .env_vars + .iter() + .map(|v| (v.key.clone(), escape_go_template(&v.value))) + .chain(if lobby_config { + Some(( + "RIVET_LOBBY_CONFIG".to_string(), + template_env_var("NOMAD_META_LOBBY_CONFIG"), + )) + } else { + None + }) + .chain(if lobby_tags { + Some(( + "RIVET_LOBBY_TAGS".to_string(), + template_env_var("NOMAD_META_LOBBY_TAGS"), + )) + } else { + None + }) + .chain([( + "RIVET_API_ENDPOINT".to_string(), + util::env::origin_api().to_string(), + )]) + .chain( + // DEPRECATED: + [ + ("RIVET_CHAT_API_URL", "chat"), + ("RIVET_GROUP_API_URL", "group"), + ("RIVET_IDENTITY_API_URL", "identity"), + ("RIVET_KV_API_URL", "kv"), + ("RIVET_MATCHMAKER_API_URL", "matchmaker"), + ] + .iter() + .filter(|_| util::env::support_deprecated_subdomains()) + .map(|(env, service)| { + ( + env.to_string(), + util::env::origin_api().replace("://", &format!("://{}.", service)), + ) + }), + ) + .chain( + [ + ( + "RIVET_NAMESPACE_NAME", + template_env_var("NOMAD_META_NAMESPACE_NAME"), + ), + ( + "RIVET_NAMESPACE_ID", + template_env_var("NOMAD_META_NAMESPACE_ID"), + ), + ( + "RIVET_VERSION_NAME", + template_env_var("NOMAD_META_VERSION_NAME"), + ), + ( + "RIVET_VERSION_ID", + template_env_var("NOMAD_META_VERSION_ID"), + ), + ( + "RIVET_GAME_MODE_ID", + template_env_var("NOMAD_META_LOBBY_GROUP_ID"), + ), + ( + "RIVET_GAME_MODE_NAME", + template_env_var("NOMAD_META_LOBBY_GROUP_NAME"), + ), + ("RIVET_LOBBY_ID", template_env_var("NOMAD_META_LOBBY_ID")), + ("RIVET_TOKEN", template_env_var("NOMAD_META_LOBBY_TOKEN")), + ("RIVET_REGION_ID", template_env_var("NOMAD_META_REGION_ID")), + ( + "RIVET_REGION_NAME", + template_env_var("NOMAD_META_REGION_NAME"), + ), + ( + "RIVET_MAX_PLAYERS_NORMAL", + template_env_var("NOMAD_META_MAX_PLAYERS_NORMAL"), + ), + ( + "RIVET_MAX_PLAYERS_DIRECT", + template_env_var("NOMAD_META_MAX_PLAYERS_DIRECT"), + ), + ( + "RIVET_MAX_PLAYERS_PARTY", + template_env_var("NOMAD_META_MAX_PLAYERS_PARTY"), + ), + // CPU in millicores + // + // < 1000 is for fractional CPU + // > 1000 is for whole CPU, will always be 1000 increments + ("RIVET_CPU", cpu.to_string()), + // Memory in bytes + ("RIVET_MEMORY", memory.to_string()), + // Memory in bytes for oversubscription + ("RIVET_MEMORY_OVERSUBSCRIBE", memory_max.to_string()), + // DEPRECATED: + ( + "RIVET_LOBBY_TOKEN", + template_env_var("NOMAD_META_LOBBY_TOKEN"), + ), + ( + "RIVET_LOBBY_GROUP_ID", + template_env_var("NOMAD_META_LOBBY_GROUP_ID"), + ), + ( + "RIVET_LOBBY_GROUP_NAME", + template_env_var("NOMAD_META_LOBBY_GROUP_NAME"), + ), + ] + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ) + // Ports + .chain(decoded_ports.iter().filter_map(|port| { + if let Some(target_port) = port.target.get_nomad_port() { + let port_value = match network_mode { + // CNI will handle mapping the host port to the container port + LobbyRuntimeNetworkMode::Bridge => target_port.to_string(), + // The container needs to listen on the correct port + LobbyRuntimeNetworkMode::Host => { + template_env_var(&nomad_host_port_env_var(&port.nomad_port_label)) + } + }; + + // Port with the kebab case port key. Included for backward compatabiilty & for + // less confusion. + Some((format!("PORT_{}", port.label.replace('-', "_")), port_value)) + } else { + None + } + })) + // Port ranges + .chain( + decoded_ports + .iter() + .filter_map(|port| { + if let PortTarget::Range { min, max } = &port.target { + let snake_port_label = port.label.replace('-', "_"); + + Some([ + ( + format!("PORT_RANGE_MIN_{}", snake_port_label), + min.to_string(), + ), + ( + format!("PORT_RANGE_MAX_{}", snake_port_label), + max.to_string(), + ), + ]) + } else { + None + } + }) + .flatten(), + ) + .map(|(k, v)| format!("{k}={v}")) + .collect::>(); + env.sort(); + + let services = decoded_ports + .iter() + .map(|port| { + if port.target.get_nomad_port().is_some() { + let service_name = format!("${{NOMAD_META_LOBBY_ID}}-{}", port.label); + GlobalResult::Ok(Some(Service { + provider: Some("nomad".into()), + name: Some(service_name), + tags: Some(vec!["game".into()]), + port_label: Some(port.nomad_port_label.clone()), + // checks: if TransportProtocol::from(port.proxy_protocol) + // == TransportProtocol::Tcp + // { + // Some(vec![ServiceCheck { + // name: Some(format!("{}-probe", port.label)), + // port_label: Some(port.nomad_port_label.clone()), + // _type: Some("tcp".into()), + // interval: Some(30_000_000_000), + // timeout: Some(2_000_000_000), + // ..ServiceCheck::new() + // }]) + // } else { + // None + // }, + ..Service::new() + })) + } else { + Ok(None) + } + }) + .filter_map(|x| x.transpose()) + .collect::>>()?; + + // Generate the command to download and decompress the file + let mut download_cmd = r#"curl -Lf "$NOMAD_META_IMAGE_ARTIFACT_URL""#.to_string(); + match build_compression { + backend::build::BuildCompression::None => {} + backend::build::BuildCompression::Lz4 => { + download_cmd.push_str(" | lz4 -d -"); + } + } + + Ok(Job { + _type: Some("batch".into()), + constraints: Some(vec![Constraint { + l_target: Some("${node.class}".into()), + r_target: Some("job".into()), + operand: Some("=".into()), + }]), + parameterized_job: Some(Box::new(ParameterizedJobConfig { + payload: Some("forbidden".into()), + meta_required: Some(vec![ + "job_runner_binary_url".into(), + "vector_socket_addr".into(), + "image_artifact_url".into(), + "namespace_id".into(), + "namespace_name".into(), + "version_id".into(), + "version_name".into(), + "lobby_group_id".into(), + "lobby_group_name".into(), + "lobby_id".into(), + "lobby_token".into(), + "lobby_config".into(), + "lobby_tags".into(), + "region_id".into(), + "region_name".into(), + "max_players_normal".into(), + "max_players_direct".into(), + "max_players_party".into(), + "root_user_enabled".into(), + ]), + meta_optional: Some(vec!["rivet_test_id".into()]), + })), + task_groups: Some(vec![TaskGroup { + name: Some(util_job::RUN_MAIN_TASK_NAME.into()), + constraints: None, // TODO: Use parameter meta to specify the hardware + affinities: None, // TODO: + // Allows for jobs to keep running and receiving players in the + // event of a disconnection from the Nomad server. + max_client_disconnect: Some(5 * 60 * 1_000_000_000), + restart_policy: Some(Box::new(RestartPolicy { + attempts: Some(0), + mode: Some("fail".into()), + ..RestartPolicy::new() + })), + reschedule_policy: Some(Box::new(ReschedulePolicy { + attempts: Some(0), + unlimited: Some(false), + ..ReschedulePolicy::new() + })), + networks: Some(vec![NetworkResource { + // The setup.sh script will set up a CNI network if using bridge networking + mode: Some("host".into()), + dynamic_ports: Some(dynamic_ports), + ..NetworkResource::new() + }]), + services: Some(services), + // Configure ephemeral disk for logs + ephemeral_disk: Some(Box::new(EphemeralDisk { + size_mb: Some(tier.disk as i32), + ..EphemeralDisk::new() + })), + tasks: Some(vec![ + Task { + name: Some("runc-setup".into()), + lifecycle: Some(Box::new(TaskLifecycle { + hook: Some("prestart".into()), + sidecar: Some(false), + })), + driver: Some("raw_exec".into()), + config: Some({ + let mut x = HashMap::new(); + x.insert("command".into(), json!("${NOMAD_TASK_DIR}/setup.sh")); + x + }), + templates: Some(vec![ + Template { + embedded_tmpl: Some(include_str!("./scripts/setup.sh").replace( + "__HOST_NETWORK__", + match network_mode { + LobbyRuntimeNetworkMode::Bridge => "false", + LobbyRuntimeNetworkMode::Host => "true", + }, + )), + dest_path: Some("${NOMAD_TASK_DIR}/setup.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some( + include_str!("./scripts/setup_job_runner.sh").into(), + ), + dest_path: Some("${NOMAD_TASK_DIR}/setup_job_runner.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some( + include_str!("./scripts/setup_oci_bundle.sh") + .replace("__DOWNLOAD_CMD__", &download_cmd) + .replace( + "__BUILD_KIND__", + match build_kind { + backend::build::BuildKind::DockerImage => { + "docker-image" + } + backend::build::BuildKind::OciBundle => "oci-bundle", + }, + ), + ), + dest_path: Some("${NOMAD_TASK_DIR}/setup_oci_bundle.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some( + include_str!("./scripts/setup_cni_network.sh").into(), + ), + dest_path: Some("${NOMAD_TASK_DIR}/setup_cni_network.sh".into()), + perms: Some("744".into()), + ..Template::new() + }, + Template { + embedded_tmpl: Some(gen_oci_bundle_config( + cpu, memory, memory_max, env, + )?), + dest_path: Some( + "${NOMAD_ALLOC_DIR}/oci-bundle-config.base.json".into(), + ), + ..Template::new() + }, + Template { + embedded_tmpl: Some(inject_consul_env_template( + &serde_json::to_string(&cni_port_mappings)?, + )?), + dest_path: Some("${NOMAD_ALLOC_DIR}/cni-port-mappings.json".into()), + ..Template::new() + }, + ]), + resources: Some(Box::new(Resources { + // TODO + // CPU: Some(util_mm::RUNC_SETUP_CPU), + // memory_mb: Some(util_mm::RUNC_SETUP_MEMORY), + CPU: None, + memory_mb: None, + ..Resources::new() + })), + log_config: Some(Box::new(LogConfig { + max_files: Some(4), + max_file_size_mb: Some(2), + disabled: None, + })), + ..Task::new() + }, + Task { + name: Some(util_job::RUN_MAIN_TASK_NAME.into()), + driver: Some("raw_exec".into()), + config: Some({ + let mut x = HashMap::new(); + // This is downloaded in setup_job_runner.sh + x.insert("command".into(), json!("${NOMAD_ALLOC_DIR}/job-runner")); + x + }), + resources: Some(Box::new(resources.clone())), + // Intentionally high timeout. Killing jobs is handled manually with signals. + kill_timeout: Some(86400 * 1_000_000_000), + kill_signal: Some("SIGTERM".into()), + log_config: Some(Box::new(LogConfig { + max_files: Some(4), + max_file_size_mb: Some(4), + disabled: None, + })), + ..Task::new() + }, + Task { + name: Some("runc-cleanup".into()), + lifecycle: Some(Box::new(TaskLifecycle { + hook: Some("poststop".into()), + sidecar: Some(false), + })), + driver: Some("raw_exec".into()), + config: Some({ + let mut x = HashMap::new(); + x.insert("command".into(), json!("${NOMAD_TASK_DIR}/cleanup.sh")); + x + }), + templates: Some(vec![Template { + embedded_tmpl: Some(include_str!("./scripts/cleanup.sh").into()), + dest_path: Some("${NOMAD_TASK_DIR}/cleanup.sh".into()), + perms: Some("744".into()), + ..Template::new() + }]), + resources: Some(Box::new(Resources { + // TODO + // CPU: Some(util_mm::RUNC_CLEANUP_CPU), + // memory_mb: Some(util_mm::RUNC_CLEANUP_MEMORY), + CPU: None, + memory_mb: None, + ..Resources::new() + })), + log_config: Some(Box::new(LogConfig { + max_files: Some(4), + max_file_size_mb: Some(2), + disabled: None, + })), + ..Task::new() + }, + ]), + ..TaskGroup::new() + }]), + ..Job::new() + }) +} + +/// Build base config used to generate the OCI bundle's config.json. +pub fn gen_oci_bundle_config( + cpu: u64, + memory: u64, + memory_max: u64, + env: Vec, +) -> GlobalResult { + let config_str = serde_json::to_string(&oci_config::config(cpu, memory, memory_max, env))?; + + // Escape Go template syntax + let config_str = inject_consul_env_template(&config_str)?; + + Ok(config_str) +} + +/// Makes user-generated string safe to inject in to a Go template. +pub fn escape_go_template(input: &str) -> String { + let re = Regex::new(r"(\{\{|\}\})").unwrap(); + re.replace_all(input, r#"{{"$1"}}"#) + .to_string() + // TODO: This removes exploits to inject env vars (see below) + // SVC-3307 + .replace("###", "") +} + +/// Generates a template string that we can substitute with the real environment variable +/// +/// This must be safe to inject in to a JSON string so it can be substituted after rendering the +/// JSON object. Intended to be used from within JSON. +/// +/// See inject_consul_env_template. +pub fn template_env_var(name: &str) -> String { + format!("###ENV:{name}###") +} + +/// Like template_env_var, but removes surrounding quotes. +pub fn template_env_var_int(name: &str) -> String { + format!("###ENV_INT:{name}###") +} + +/// Substitutes env vars generated from template_env_var with Consul template syntax. +/// +/// Intended to be used from within JSON. +pub fn inject_consul_env_template(input: &str) -> GlobalResult { + // Regular strings + let re = Regex::new(r"###ENV:(\w+)###")?; + let output = re + .replace_all(input, r#"{{ env "$1" | regexReplaceAll "\"" "\\\"" }}"#) + .to_string(); + + // Integers + let re = Regex::new(r####""###ENV_INT:(\w+)###""####)?; + let output = re + .replace_all(&output, r#"{{ env "$1" | regexReplaceAll "\"" "\\\"" }}"#) + .to_string(); + + Ok(output) +} + +pub fn nomad_host_port_env_var(port_label: &str) -> String { + format!("NOMAD_HOST_PORT_{}", port_label.replace('-', "_")) +} diff --git a/svc/pkg/ds/ops/server-create/src/oci_config.rs b/svc/pkg/ds/ops/server-create/src/oci_config.rs new file mode 100644 index 000000000..f03105fd1 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/oci_config.rs @@ -0,0 +1,316 @@ +use chirp_worker::prelude::*; +use serde_json::json; + +// CPU period in microseconds. +// +// https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt +const CPU_PERIOD: u64 = 100000; + +/// Generates base config.json for an OCI bundle. +pub fn config(cpu: u64, memory: u64, memory_max: u64, env: Vec) -> serde_json::Value { + // CPU shares is a relative weight. It doesn't matter what unit we pass here as + // long as the ratios between the containers are correct. + // + // Corresponds to cpu.weight in cgroups. Must be [1, 10_000] + // + // We divide by 8 in order to make sure the CPU shares are within bounds. `cpu` is measured in + // millishares, so 1_000 = 1 core. For a range of 32d1 (32_000) to 1d16 (62), we divide by 8 + // to make the range 3_200 to 6. + let mut cpu_shares = cpu / 10; + if cpu_shares > 10_000 { + cpu_shares = 10_000; + tracing::warn!(?cpu_shares, "cpu_shares > 10_000"); + } else if cpu_shares < 1 { + cpu_shares = 1; + tracing::warn!(?cpu_shares, "cpu_shares < 1"); + } + + // This is a modified version of the default config.json generated by containerd. + // + // Some values will be overridden at runtime by the values in the OCI bundle's config.json. + // + // Default Docker spec: https://github.com/moby/moby/blob/777e9f271095685543f30df0ff7a12397676f938/oci/defaults.go#L49 + // + // Generate config.json with containerd: + // ctr run --rm -t --seccomp docker.io/library/debian:latest debian-container-id /bin/bash + // cat /run/containerd/io.containerd.runtime.v2.task/default/debian-container-id/config.json | jq + json!({ + "ociVersion": "1.0.2-dev", + "process": { + // user, args, and cwd will be injected at runtime + + // Will be merged with the OCI bundle's env + // + // These will take priority over the OCI bundle's env + "env": env, + + "terminal": false, + "capabilities": { + "bounding": capabilities(), + "effective": capabilities(), + "permitted": capabilities() + }, + "rlimits": [ + { + "type": "RLIMIT_NOFILE", + "hard": 1024, + "soft": 1024 + } + ], + "noNewPrivileges": true + + // TODO: oomScoreAdj + // TODO: scheduler + // TODO: iopriority + // TODO: rlimit? + }, + "root": { + "path": "rootfs", + // This means we can't reuse the oci-bundle since the rootfs is writable. + "readonly": false + }, + "mounts": mounts(), + "linux": { + "resources": { + "devices": linux_resources_devices(), + "cpu": { + "shares": cpu_shares, + // If `quota` is greater than `period`, it is allowed to use multiple cores. + // + // Read more: https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-cpu + // "quota": CPU_PERIOD * cpu / 1_000, + // "period": CPU_PERIOD, + // Use the env var for the CPU since Nomad handles assigning CPUs to each task + // "cpus": if cpu >= 1_000 { + // Some(template_env_var("NOMAD_CPU_CORES")) + // } else { + // None + // } + }, + // Docker: https://github.com/moby/moby/blob/777e9f271095685543f30df0ff7a12397676f938/daemon/daemon_unix.go#L75 + "memory": { + "reservation": memory, + "limit": memory_max, + }, + + // TODO: network + // TODO: pids + // TODO: hugepageLimits + // TODO: blockIO + }, + // "cgroupsPath": "/default/debian-container-id", + "namespaces": [ + { + "type": "pid" + }, + { + "type": "ipc" + }, + { + "type": "uts" + }, + { + "type": "mount" + } + ], + "maskedPaths": [ + "/proc/acpi", + "/proc/asound", + "/proc/kcore", + "/proc/keys", + "/proc/latency_stats", + "/proc/timer_list", + "/proc/timer_stats", + "/proc/sched_debug", + "/sys/firmware", + "/proc/scsi" + ], + "readonlyPaths": [ + "/proc/bus", + "/proc/fs", + "/proc/irq", + "/proc/sys", + "/proc/sysrq-trigger" + ], + "seccomp": super::seccomp::seccomp() + } + }) +} + +// Default Docker capabilities: https://github.com/moby/moby/blob/777e9f271095685543f30df0ff7a12397676f938/oci/caps/defaults.go#L4 +fn capabilities() -> Vec<&'static str> { + vec![ + "CAP_CHOWN", + "CAP_DAC_OVERRIDE", + "CAP_FSETID", + "CAP_FOWNER", + "CAP_MKNOD", + "CAP_NET_RAW", + "CAP_SETGID", + "CAP_SETUID", + "CAP_SETFCAP", + "CAP_SETPCAP", + "CAP_NET_BIND_SERVICE", + "CAP_SYS_CHROOT", + "CAP_KILL", + "CAP_AUDIT_WRITE", + ] +} + +fn mounts() -> serde_json::Value { + json!([ + { + "destination": "/proc", + "type": "proc", + "source": "proc", + "options": [ + "nosuid", + "noexec", + "nodev" + ] + }, + { + "destination": "/dev", + "type": "tmpfs", + "source": "tmpfs", + "options": [ + "nosuid", + "strictatime", + "mode=755", + "size=65536k" + ] + }, + { + "destination": "/dev/pts", + "type": "devpts", + "source": "devpts", + "options": [ + "nosuid", + "noexec", + "newinstance", + "ptmxmode=0666", + "mode=0620", + "gid=5" + ] + }, + { + "destination": "/dev/shm", + "type": "tmpfs", + "source": "shm", + "options": [ + "nosuid", + "noexec", + "nodev", + "mode=1777", + "size=65536k" + ] + }, + { + "destination": "/dev/mqueue", + "type": "mqueue", + "source": "mqueue", + "options": [ + "nosuid", + "noexec", + "nodev" + ] + }, + { + "destination": "/sys", + "type": "sysfs", + "source": "sysfs", + "options": [ + "nosuid", + "noexec", + "nodev", + "ro" + ] + }, + { + "destination": "/run", + "type": "tmpfs", + "source": "tmpfs", + "options": [ + "nosuid", + "strictatime", + "mode=755", + "size=65536k" + ] + } + ]) +} + +fn linux_resources_devices() -> serde_json::Value { + // Devices implicitly contains the following devices: + // null, zero, full, random, urandom, tty, console, and ptmx. + // ptmx is a bind mount or symlink of the container's ptmx. + // See also: https://github.com/opencontainers/runtime-spec/blob/master/config-linux.md#default-devices + json!([ + { + "allow": false, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 1, + "minor": 3, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 1, + "minor": 8, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 1, + "minor": 7, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 5, + "minor": 0, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 1, + "minor": 5, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 1, + "minor": 9, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 5, + "minor": 1, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 136, + "access": "rwm" + }, + { + "allow": true, + "type": "c", + "major": 5, + "minor": 2, + "access": "rwm" + } + ]) +} diff --git a/svc/pkg/ds/ops/server-create/src/scripts/cleanup.sh b/svc/pkg/ds/ops/server-create/src/scripts/cleanup.sh new file mode 100644 index 000000000..2815a2617 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/scripts/cleanup.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +set -euf -o pipefail + +log() { + local timestamp=$(date +"%Y-%m-%d %H:%M:%S.%3N") + echo "[$timestamp] [cleanup] $@" +} + +# MARK: Generate CNI parameters +export CNI_PATH="/opt/cni/bin" +export NETCONFPATH="/opt/cni/config" +export CNI_IFNAME="eth0" +export CAP_ARGS=$(cat "$NOMAD_ALLOC_DIR/cni-cap-args.json") + +# Every step in this script gracefully fails so everything gets cleaned up no matter what. + +if [ -f "$NOMAD_ALLOC_DIR/container-id" ]; then + CONTAINER_ID=$(cat "$NOMAD_ALLOC_DIR/container-id") + NETWORK_NAME="rivet-job" + NETNS_PATH="/var/run/netns/$CONTAINER_ID" + + log "Deleting container $CONTAINER_ID" + runc delete --force "$CONTAINER_ID" || log 'Failed to delete container' >&2 + + log "Deleting network $NETWORK_NAME from namespace $NETNS_PATH" + cnitool del $NETWORK_NAME $NETNS_PATH || log 'Failed to delete network' >&2 + + log "Deleting network $CONTAINER_ID" + ip netns del "$CONTAINER_ID" || log 'Failed to delete network' >&2 +else + log "No container ID found. Network may have leaked." >&2 +fi + diff --git a/svc/pkg/ds/ops/server-create/src/scripts/setup.sh b/svc/pkg/ds/ops/server-create/src/scripts/setup.sh new file mode 100644 index 000000000..02b3f424b --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/scripts/setup.sh @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +set -euf -o pipefail + +log() { + local timestamp=$(date +"%Y-%m-%d %H:%M:%S.%3N") + echo "[$timestamp] [setup] $@" +} + +log "Starting setup" + +log 'Env:' +env +echo + +# Need to prefix with "rivet-" in order to not interfere with any +# auto-generated resources that Nomad creates for the given alloc ID +export CONTAINER_ID="rivet-$NOMAD_ALLOC_ID" +log "CONTAINER_ID: $CONTAINER_ID" +echo -n "$CONTAINER_ID" > "$NOMAD_ALLOC_DIR/container-id" + +# Path to the created namespace +if __HOST_NETWORK__; then + # Host network + export NETNS_PATH="/proc/1/ns/net" +else + # CNI network that will be created + export NETNS_PATH="/var/run/netns/$CONTAINER_ID" +fi + +# Run job runner setup script +"$NOMAD_TASK_DIR/setup_job_runner.sh" & +pid_job_runner=$! + +# Run OCI setup script +"$NOMAD_TASK_DIR/setup_oci_bundle.sh" & +pid_oci=$! + +# Run CNI setup script +if ! __HOST_NETWORK__; then + "$NOMAD_TASK_DIR/setup_cni_network.sh" & + pid_cni=$! +fi + +# Wait for job runner setup scripts to finish +wait $pid_job_runner +exit_status_job_runner=$? +if [ $exit_status_job_runner -ne 0 ]; then + log "job-runner setup failed with exit code $exit_status_job_runner" + exit $exit_status_job_runner +fi + +# Wait for OCI setup scripts to finish +wait $pid_oci +exit_status_oci=$? +if [ $exit_status_oci -ne 0 ]; then + log "OCI setup failed with exit code $exit_status_oci" + exit $exit_status_oci +fi + +# Wait for CNI setup script to finish +if ! __HOST_NETWORK__; then + wait $pid_cni + exit_status_cni=$? + if [ $exit_status_cni -ne 0 ]; then + log "CNI setup failed with exit code $exit_status_cni" + exit $exit_status_cni + fi +fi + +log "Setup finished" + diff --git a/svc/pkg/ds/ops/server-create/src/scripts/setup_cni_network.sh b/svc/pkg/ds/ops/server-create/src/scripts/setup_cni_network.sh new file mode 100644 index 000000000..8a97a2e34 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/scripts/setup_cni_network.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +set -euf -o pipefail + +log() { + local timestamp=$(date +"%Y-%m-%d %H:%M:%S.%3N") + echo "[$timestamp] [setup_cni_network] $@" +} + +# MARK: Generate CNI parameters +# +# See https://github.com/containernetworking/cni/blob/b62753aa2bfa365c1ceaff6f25774a8047c896b5/cnitool/cnitool.go#L31 + +# See Nomad capabilities equivalent: +# https://github.com/hashicorp/nomad/blob/a8f0f2612ef9d283ed903721f8453a0c0c3f51c5/client/allocrunner/networking_cni.go#L105C46-L105C46 +# +# See supported args: +# https://github.com/containerd/go-cni/blob/6603d5bd8941d7f2026bb5627f6aa4ff434f859a/namespace_opts.go#L22 +jq -c < "$NOMAD_ALLOC_DIR/cni-cap-args.json" +{ + "portMappings": $(cat "$NOMAD_ALLOC_DIR/cni-port-mappings.json") +} +EOF + +export CNI_PATH="/opt/cni/bin" +export NETCONFPATH="/opt/cni/config" +export CNI_IFNAME="eth0" +export CAP_ARGS=$(cat "$NOMAD_ALLOC_DIR/cni-cap-args.json") +log "CAP_ARGS: $CAP_ARGS" + +# MARK: Create network +# +# See Nomad network creation: +# https://github.com/hashicorp/nomad/blob/a8f0f2612ef9d283ed903721f8453a0c0c3f51c5/client/allocrunner/network_manager_linux.go#L119 + +# Name of the network in /opt/cni/config/$NETWORK_NAME.conflist +NETWORK_NAME="rivet-job" + +log "Creating network $CONTAINER_ID" +ip netns add "$CONTAINER_ID" + +log "Adding network $NETWORK_NAME to namespace $NETNS_PATH" +cnitool add "$NETWORK_NAME" "$NETNS_PATH" > $NOMAD_ALLOC_DIR/cni.json + +log "Finished setting up CNI network" + diff --git a/svc/pkg/ds/ops/server-create/src/scripts/setup_job_runner.sh b/svc/pkg/ds/ops/server-create/src/scripts/setup_job_runner.sh new file mode 100644 index 000000000..2c2d11671 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/scripts/setup_job_runner.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euf -o pipefail + +log() { + local timestamp=$(date +"%Y-%m-%d %H:%M:%S.%3N") + echo "[$timestamp] [setup_job_runner] $@" +} + +# Download job runner binary +curl -Lf "$NOMAD_META_job_runner_binary_url" -o "${NOMAD_ALLOC_DIR}/job-runner" +chmod +x "${NOMAD_ALLOC_DIR}/job-runner" +log "Finished downloading job-runner" + diff --git a/svc/pkg/ds/ops/server-create/src/scripts/setup_oci_bundle.sh b/svc/pkg/ds/ops/server-create/src/scripts/setup_oci_bundle.sh new file mode 100644 index 000000000..a5e4bec32 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/scripts/setup_oci_bundle.sh @@ -0,0 +1,84 @@ +#!/usr/bin/env bash +set -euf -o pipefail + +log() { + local timestamp=$(date +"%Y-%m-%d %H:%M:%S.%3N") + echo "[$timestamp] [setup_oci_bundle] $@" +} + +DOCKER_IMAGE_PATH="$NOMAD_ALLOC_DIR/docker-image.tar" +OCI_IMAGE_PATH="$NOMAD_ALLOC_DIR/oci-image" +OCI_BUNDLE_PATH="$NOMAD_ALLOC_DIR/oci-bundle" + +# MARK: Generate OCI bundle +case "__BUILD_KIND__" in + "docker-image") + # We need to convert the Docker image to an OCI bundle in order to run it. + + log "Downloading Docker image" + __DOWNLOAD_CMD__ > "$DOCKER_IMAGE_PATH" + + # Allows us to work with the build with umoci + log "Converting Docker image -> OCI image" + skopeo copy "docker-archive:$DOCKER_IMAGE_PATH" "oci:$OCI_IMAGE_PATH:default" + + # Allows us to run the bundle natively with runc + log "Converting OCI image -> OCI bundle" + + umoci unpack --image "$OCI_IMAGE_PATH:default" "$OCI_BUNDLE_PATH" + ;; + "oci-bundle") + log "Downloading OCI bundle" + mkdir "$OCI_BUNDLE_PATH" + __DOWNLOAD_CMD__ | tar -x -C "$OCI_BUNDLE_PATH" + + ;; + *) + log "Unknown build kind" + exit 1 + ;; +esac + +# resolv.conf +# +# See also rivet-job.conflist in lib/bolt/core/src/dep/terraform/install_scripts/files/nomad.sh +cat < $NOMAD_ALLOC_DIR/resolv.conf +nameserver 8.8.8.8 +nameserver 8.8.4.4 +nameserver 2001:4860:4860::8888 +nameserver 2001:4860:4860::8844 +options rotate +options edns0 +options attempts:2 +EOF + +# MARK: Config +# +# Sanitize the config.json by copying safe properties from the provided bundle in to our base config. +# +# This way, we enforce our own capabilities on the container instead of trusting the +# provided config.json +log "Templating config.json" +OVERRIDE_CONFIG="$NOMAD_ALLOC_DIR/oci-bundle-config.overrides.json" +mv "$OCI_BUNDLE_PATH/config.json" "$OVERRIDE_CONFIG" + + +# Template new config +jq " +.process.args = $(jq '.process.args' $OVERRIDE_CONFIG) | +.process.env = $(jq '.process.env' $OVERRIDE_CONFIG) + .process.env | +.process.user = $(jq '.process.user' $OVERRIDE_CONFIG) | +.process.cwd = $(jq '.process.cwd' $OVERRIDE_CONFIG) | +.linux.namespaces += [{\"type\": \"network\", \"path\": \"$NETNS_PATH\"}] | +.mounts += [{ + \"destination\": \"/etc/resolv.conf\", + \"type\": \"bind\", + \"source\": \"$NOMAD_ALLOC_DIR/resolv.conf\", + \"options\": [\"rbind\", \"rprivate\"] +}] +" "$NOMAD_ALLOC_DIR/oci-bundle-config.base.json" > "$OCI_BUNDLE_PATH/config.json" + +# Config will be validated in `job-runner` + +log "Finished setting up OCI bundle" + diff --git a/svc/pkg/ds/ops/server-create/src/seccomp.rs b/svc/pkg/ds/ops/server-create/src/seccomp.rs new file mode 100644 index 000000000..f3f7fcfec --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/seccomp.rs @@ -0,0 +1,481 @@ +use chirp_worker::prelude::*; +use serde_json::json; + +pub fn seccomp() -> serde_json::Value { + // Copied from auto-generated containerd + // + // See comment in super::oci_conifg::config on how to generate this + json!({ + "defaultAction": "SCMP_ACT_ERRNO", + "architectures": [ + "SCMP_ARCH_X86_64", + "SCMP_ARCH_X86", + "SCMP_ARCH_X32" + ], + "syscalls": [ + { + "names": syscall_names(), + "action": "SCMP_ACT_ALLOW" + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 0, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 8, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 131072, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 131080, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "personality" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 4294967295u32, + "op": "SCMP_CMP_EQ" + } + ] + }, + { + "names": [ + "ptrace" + ], + "action": "SCMP_ACT_ALLOW" + }, + { + "names": [ + "arch_prctl", + "modify_ldt" + ], + "action": "SCMP_ACT_ALLOW" + }, + { + "names": [ + "chroot" + ], + "action": "SCMP_ACT_ALLOW" + }, + { + "names": [ + "clone" + ], + "action": "SCMP_ACT_ALLOW", + "args": [ + { + "index": 0, + "value": 2114060288, + "op": "SCMP_CMP_MASKED_EQ" + } + ] + }, + { + "names": [ + "clone3" + ], + "action": "SCMP_ACT_ERRNO", + "errnoRet": 38 + } + ] + }) +} + +fn syscall_names() -> Vec<&'static str> { + vec![ + "accept", + "accept4", + "access", + "adjtimex", + "alarm", + "bind", + "brk", + "capget", + "capset", + "chdir", + "chmod", + "chown", + "chown32", + "clock_adjtime", + "clock_adjtime64", + "clock_getres", + "clock_getres_time64", + "clock_gettime", + "clock_gettime64", + "clock_nanosleep", + "clock_nanosleep_time64", + "close", + "close_range", + "connect", + "copy_file_range", + "creat", + "dup", + "dup2", + "dup3", + "epoll_create", + "epoll_create1", + "epoll_ctl", + "epoll_ctl_old", + "epoll_pwait", + "epoll_pwait2", + "epoll_wait", + "epoll_wait_old", + "eventfd", + "eventfd2", + "execve", + "execveat", + "exit", + "exit_group", + "faccessat", + "faccessat2", + "fadvise64", + "fadvise64_64", + "fallocate", + "fanotify_mark", + "fchdir", + "fchmod", + "fchmodat", + "fchown", + "fchown32", + "fchownat", + "fcntl", + "fcntl64", + "fdatasync", + "fgetxattr", + "flistxattr", + "flock", + "fork", + "fremovexattr", + "fsetxattr", + "fstat", + "fstat64", + "fstatat64", + "fstatfs", + "fstatfs64", + "fsync", + "ftruncate", + "ftruncate64", + "futex", + "futex_time64", + "futex_waitv", + "futimesat", + "getcpu", + "getcwd", + "getdents", + "getdents64", + "getegid", + "getegid32", + "geteuid", + "geteuid32", + "getgid", + "getgid32", + "getgroups", + "getgroups32", + "getitimer", + "getpeername", + "getpgid", + "getpgrp", + "getpid", + "getppid", + "getpriority", + "getrandom", + "getresgid", + "getresgid32", + "getresuid", + "getresuid32", + "getrlimit", + "get_robust_list", + "getrusage", + "getsid", + "getsockname", + "getsockopt", + "get_thread_area", + "gettid", + "gettimeofday", + "getuid", + "getuid32", + "getxattr", + "inotify_add_watch", + "inotify_init", + "inotify_init1", + "inotify_rm_watch", + "io_cancel", + "ioctl", + "io_destroy", + "io_getevents", + "io_pgetevents", + "io_pgetevents_time64", + "ioprio_get", + "ioprio_set", + "io_setup", + "io_submit", + "io_uring_enter", + "io_uring_register", + "io_uring_setup", + "ipc", + "kill", + "landlock_add_rule", + "landlock_create_ruleset", + "landlock_restrict_self", + "lchown", + "lchown32", + "lgetxattr", + "link", + "linkat", + "listen", + "listxattr", + "llistxattr", + "_llseek", + "lremovexattr", + "lseek", + "lsetxattr", + "lstat", + "lstat64", + "madvise", + "membarrier", + "memfd_create", + "memfd_secret", + "mincore", + "mkdir", + "mkdirat", + "mknod", + "mknodat", + "mlock", + "mlock2", + "mlockall", + "mmap", + "mmap2", + "mprotect", + "mq_getsetattr", + "mq_notify", + "mq_open", + "mq_timedreceive", + "mq_timedreceive_time64", + "mq_timedsend", + "mq_timedsend_time64", + "mq_unlink", + "mremap", + "msgctl", + "msgget", + "msgrcv", + "msgsnd", + "msync", + "munlock", + "munlockall", + "munmap", + "name_to_handle_at", + "nanosleep", + "newfstatat", + "_newselect", + "open", + "openat", + "openat2", + "pause", + "pidfd_open", + "pidfd_send_signal", + "pipe", + "pipe2", + "pkey_alloc", + "pkey_free", + "pkey_mprotect", + "poll", + "ppoll", + "ppoll_time64", + "prctl", + "pread64", + "preadv", + "preadv2", + "prlimit64", + "process_mrelease", + "pselect6", + "pselect6_time64", + "pwrite64", + "pwritev", + "pwritev2", + "read", + "readahead", + "readlink", + "readlinkat", + "readv", + "recv", + "recvfrom", + "recvmmsg", + "recvmmsg_time64", + "recvmsg", + "remap_file_pages", + "removexattr", + "rename", + "renameat", + "renameat2", + "restart_syscall", + "rmdir", + "rseq", + "rt_sigaction", + "rt_sigpending", + "rt_sigprocmask", + "rt_sigqueueinfo", + "rt_sigreturn", + "rt_sigsuspend", + "rt_sigtimedwait", + "rt_sigtimedwait_time64", + "rt_tgsigqueueinfo", + "sched_getaffinity", + "sched_getattr", + "sched_getparam", + "sched_get_priority_max", + "sched_get_priority_min", + "sched_getscheduler", + "sched_rr_get_interval", + "sched_rr_get_interval_time64", + "sched_setaffinity", + "sched_setattr", + "sched_setparam", + "sched_setscheduler", + "sched_yield", + "seccomp", + "select", + "semctl", + "semget", + "semop", + "semtimedop", + "semtimedop_time64", + "send", + "sendfile", + "sendfile64", + "sendmmsg", + "sendmsg", + "sendto", + "setfsgid", + "setfsgid32", + "setfsuid", + "setfsuid32", + "setgid", + "setgid32", + "setgroups", + "setgroups32", + "setitimer", + "setpgid", + "setpriority", + "setregid", + "setregid32", + "setresgid", + "setresgid32", + "setresuid", + "setresuid32", + "setreuid", + "setreuid32", + "setrlimit", + "set_robust_list", + "setsid", + "setsockopt", + "set_thread_area", + "set_tid_address", + "setuid", + "setuid32", + "setxattr", + "shmat", + "shmctl", + "shmdt", + "shmget", + "shutdown", + "sigaltstack", + "signalfd", + "signalfd4", + "sigprocmask", + "sigreturn", + "socket", + "socketcall", + "socketpair", + "splice", + "stat", + "stat64", + "statfs", + "statfs64", + "statx", + "symlink", + "symlinkat", + "sync", + "sync_file_range", + "syncfs", + "sysinfo", + "tee", + "tgkill", + "time", + "timer_create", + "timer_delete", + "timer_getoverrun", + "timer_gettime", + "timer_gettime64", + "timer_settime", + "timer_settime64", + "timerfd_create", + "timerfd_gettime", + "timerfd_gettime64", + "timerfd_settime", + "timerfd_settime64", + "times", + "tkill", + "truncate", + "truncate64", + "ugetrlimit", + "umask", + "uname", + "unlink", + "unlinkat", + "utime", + "utimensat", + "utimensat_time64", + "utimes", + "vfork", + "vmsplice", + "wait4", + "waitid", + "waitpid", + "write", + "writev", + ] +} diff --git a/svc/pkg/ds/ops/server-create/src/util_job.rs b/svc/pkg/ds/ops/server-create/src/util_job.rs new file mode 100644 index 000000000..363634442 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_job.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +pub mod key; + +/// Determines if a Nomad job is dispatched from our run. +/// +/// We use this when monitoring Nomad in order to determine which events to +/// pay attention to. +pub fn is_nomad_job_run(job_id: &str) -> bool { + job_id.starts_with("job-") && job_id.contains("/dispatch-") +} + +// Timeout from when `stop_job` is called and the kill signal is sent +pub const JOB_STOP_TIMEOUT: Duration = Duration::from_secs(30); + +pub const TASK_CLEANUP_CPU: i32 = 50; + +// Query Prometheus with: +// +// ``` +// max(nomad_client_allocs_memory_max_usage{ns="prod",exported_job=~"job-.*",task="run-cleanup"}) / 1000 / 1000 +// ``` +// +// 13.5 MB baseline, 29 MB highest peak +pub const TASK_CLEANUP_MEMORY: i32 = 32; + +pub const RUN_MAIN_TASK_NAME: &str = "main"; +pub const RUN_CLEANUP_TASK_NAME: &str = "run-cleanup"; diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/consts.rs b/svc/pkg/ds/ops/server-create/src/util_mm/consts.rs new file mode 100644 index 000000000..10d67ca4a --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/consts.rs @@ -0,0 +1,39 @@ +use rivet_util as util; + +pub const LOBBY_READY_TIMEOUT: i64 = util::duration::minutes(5); +pub const PLAYER_READY_TIMEOUT: i64 = util::duration::minutes(2); +pub const PLAYER_AUTO_REMOVE_TIMEOUT: i64 = util::duration::hours(8); + +pub const MIN_HOST_PORT: u16 = 26000; +pub const MAX_HOST_PORT: u16 = 31999; + +/// Constants used for mocking responses when using dev tokens. +pub const DEV_REGION_ID: &str = "dev-lcl"; +pub const DEV_PROVIDER_NAME: &str = "Development"; +pub const DEV_REGION_NAME: &str = "Local"; + +// Also see svc/mm-lobby-create/src/nomad_job.rs +pub const DEFAULT_ENV_KEYS: &[&str] = &[ + "RIVET_API_ENDPOINT", + "RIVET_CHAT_API_URL", + "RIVET_GROUP_API_URL", + "RIVET_IDENTITY_API_URL", + "RIVET_KV_API_URL", + "RIVET_MATCHMAKER_API_URL", + "RIVET_NAMESPACE_NAME", + "RIVET_NAMESPACE_ID", + "RIVET_VERSION_NAME", + "RIVET_VERSION_ID", + "RIVET_GAME_MODE_ID", + "RIVET_GAME_MODE_NAME", + "RIVET_LOBBY_ID", + "RIVET_TOKEN", + "RIVET_REGION_ID", + "RIVET_REGION_NAME", + "RIVET_MAX_PLAYERS_NORMAL", + "RIVET_MAX_PLAYERS_DIRECT", + "RIVET_MAX_PLAYERS_PARTY", + "RIVET_LOBBY_TOKEN", + "RIVET_LOBBY_GROUP_ID", + "RIVET_LOBBY_GROUP_NAME", +]; diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/defaults.rs b/svc/pkg/ds/ops/server-create/src/util_mm/defaults.rs new file mode 100644 index 000000000..1c85d6df0 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/defaults.rs @@ -0,0 +1,4 @@ +pub const TIER_NAME_ID: &str = "basic-1d1"; +pub const MAX_PLAYERS_NORMAL: u32 = 32; +pub const MAX_PLAYERS_DIRECT: u32 = 40; +pub const MAX_PLAYERS_PARTY: u32 = 40; diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/key.rs b/svc/pkg/ds/ops/server-create/src/util_mm/key.rs new file mode 100644 index 000000000..aaa3661f0 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/key.rs @@ -0,0 +1,208 @@ +use uuid::Uuid; + + +/// HASH +pub fn player_config(player_id: Uuid) -> String { + format!("{{global}}:mm:player:{}:config", player_id) +} + +pub mod player_config { + use uuid::Uuid; + + #[derive(Debug, serde::Serialize)] + pub struct Config { + #[serde(rename = "l")] + pub lobby_id: Uuid, + #[serde(rename = "qi")] + pub query_id: Option, + #[serde(rename = "ra")] + pub remote_address: String, + } + + pub const LOBBY_ID: &str = "l"; + pub const QUERY_ID: &str = "qi"; + pub const REMOTE_ADDRESS: &str = "ra"; +} + +/// HASH +pub fn lobby_config(lobby_id: Uuid) -> String { + format!("{{global}}:mm:lobby:{}:config", lobby_id) +} + +/// HASH +pub fn lobby_tags(lobby_id: Uuid) -> String { + format!("{{global}}:mm:lobby:{}:tags", lobby_id) +} + +pub mod lobby_config { + use uuid::Uuid; + + #[derive(Debug, serde::Serialize)] + pub struct Config { + #[serde(rename = "ns")] + pub namespace_id: Uuid, + #[serde(rename = "r")] + pub region_id: Uuid, + #[serde(rename = "lg")] + pub lobby_group_id: Uuid, + #[serde(rename = "mpn")] + pub max_players_normal: u32, + #[serde(rename = "mpp")] + pub max_players_party: u32, + #[serde(rename = "mpd")] + pub max_players_direct: u32, + #[serde(rename = "p")] + pub preemptive: bool, + #[serde(rename = "rt", skip_serializing_if = "Option::is_none")] + pub ready_ts: Option, + #[serde(rename = "c")] + pub is_closed: bool, + #[serde(rename = "cu")] + pub is_custom: bool, + #[serde(rename = "st", skip_serializing_if = "Option::is_none")] + pub state_json: Option, + } + + pub const NAMESPACE_ID: &str = "ns"; + pub const REGION_ID: &str = "r"; + pub const LOBBY_GROUP_ID: &str = "lg"; + pub const MAX_PLAYERS_NORMAL: &str = "mpn"; + pub const MAX_PLAYERS_PARTY: &str = "mpp"; + pub const MAX_PLAYERS_DIRECT: &str = "mpd"; + pub const PREEMPTIVE: &str = "p"; + pub const READY_TS: &str = "rt"; + pub const IS_CLOSED: &str = "c"; + pub const IS_CUSTOM: &str = "cu"; + pub const STATE_JSON: &str = "st"; +} + +/// HASH +/// +/// Includes the state of all active find queries. +pub fn find_query_state(query_id: Uuid) -> String { + format!("{{global}}:mm:find_query:{}:state", query_id) +} + +pub mod find_query_state { + use uuid::Uuid; + + #[derive(Debug, serde::Serialize)] + pub struct State { + #[serde(rename = "n")] + pub namespace_id: Uuid, + #[serde(rename = "l", skip_serializing_if = "Option::is_none")] + pub lobby_id: Option, + #[serde(rename = "lac", skip_serializing_if = "Option::is_none")] + pub lobby_auto_created: Option, + #[serde(rename = "s")] + pub status: u8, + } + + pub const NAMESPACE_ID: &str = "n"; + pub const PLAYER_IDS: &str = "pl"; + pub const LOBBY_ID: &str = "l"; + pub const LOBBY_AUTO_CREATED: &str = "lac"; + pub const STATUS: &str = "s"; +} + +/// SET +pub fn find_query_player_ids(query_id: Uuid) -> String { + format!("{{global}}:mm:find_query:{}:player_ids", query_id) +} + +/// ZSET +/// +/// Includes all active find queries for a lobby. +pub fn lobby_find_queries(lobby_id: Uuid) -> String { + format!("{{global}}:mm:lobby:{}:find_queries", lobby_id) +} + +/// ZSET +pub fn ns_player_ids(namespace_id: Uuid) -> String { + format!("{{global}}:mm:ns:{}:player_ids", namespace_id) +} + +/// ZSET +pub fn ns_lobby_ids(namespace_id: Uuid) -> String { + format!("{{global}}:mm:ns:{}:lobby_ids", namespace_id) +} + +/// SET +pub fn ns_remote_address_player_ids(namespace_id: Uuid, remote_address: &str) -> String { + format!( + "{{global}}:mm:ns:{}:remote_address:{}:player_ids", + namespace_id, remote_address + ) +} + +/// ZSET +pub fn lobby_player_ids(lobby_id: Uuid) -> String { + format!("{{global}}:mm:lobby:{}:player_ids", lobby_id) +} + +/// ZSET +pub fn lobby_registered_player_ids(lobby_id: Uuid) -> String { + format!("{{global}}:mm:lobby:{}:registered_player_ids", lobby_id) +} + +/// ZSET +pub fn idle_lobby_ids(namespace_id: Uuid, region_id: Uuid, lobby_group_id: Uuid) -> String { + format!( + "{{global}}:mm:ns:{}:region:{}:lg:{}:idle_lobby_ids", + namespace_id, region_id, lobby_group_id + ) +} + +/// Map containing all idle lobbies and their associated lobby group +/// IDs. +/// +/// We limit this to just idle lobbies since we need to iterate over all +/// the values in this hash in mm-lobby-idle-update, so we want to limit +/// the values in here as much as possible. +/// +/// We keep this all in one hash so we only have to lock one key instead +/// of using `SCAN`. +/// +/// HASH +pub fn idle_lobby_lobby_group_ids(namespace_id: Uuid, region_id: Uuid) -> String { + format!( + "{{global}}:mm:ns:{}:region:{}:lobby:idle:lobby_group_ids", + namespace_id, region_id, + ) +} + +/// ZSET +pub fn lobby_available_spots( + namespace_id: Uuid, + region_id: Uuid, + lobby_group_id: Uuid, + join_kind: super::JoinKind, +) -> String { + format!( + "{{global}}:mm:ns:{}:region:{}:lg:{}:lobby:available_spots:{}", + namespace_id, + region_id, + lobby_group_id, + join_kind.short() + ) +} + +/// ZSET +pub fn lobby_unready() -> String { + "{global}:mm:lobby:unready".to_string() +} + +/// ZSET +pub fn player_unregistered() -> String { + "{global}:mm:player:unregistered".to_string() +} + +/// ZSET +pub fn player_auto_remove() -> String { + "{global}:mm:player:auto_remove".to_string() +} + +// Placeholder key +pub fn empty() -> String { + "{global}".to_string() +} diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/mod.rs b/svc/pkg/ds/ops/server-create/src/util_mm/mod.rs new file mode 100644 index 000000000..ac75d4f9f --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/mod.rs @@ -0,0 +1,48 @@ +pub mod consts; +pub mod defaults; +pub mod key; +pub mod test; +pub mod verification; +pub mod version_migrations; + +pub enum JoinKind { + Normal, + Party, + Direct, +} + +impl JoinKind { + pub fn short(self) -> &'static str { + match self { + JoinKind::Normal => "normal", + JoinKind::Party => "party", + JoinKind::Direct => "direct", + } + } +} + +#[derive(Debug, PartialEq, strum::FromRepr)] +#[repr(u8)] +pub enum FindQueryStatus { + /// Lobby is creating or in between mm-lobby-find and + /// mm-lobby-find-try-complete. + Pending = 0, + /// Find finished and lobby is ready. + Complete = 1, + /// There was an error. + Fail = 2, +} + +/// Formats the port label to be used in Nomad. +/// +/// Prefixing this port ensure that the user defined port names don't interfere +/// with other ports. +pub fn format_nomad_port_label(port_label: &str) -> String { + let snake_port_label = heck::SnakeCase::to_snake_case(port_label); + format!("game_{snake_port_label}") +} + +pub const RUNC_SETUP_CPU: i32 = 50; +pub const RUNC_SETUP_MEMORY: i32 = 32; +pub const RUNC_CLEANUP_CPU: i32 = 50; +pub const RUNC_CLEANUP_MEMORY: i32 = 32; diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/test.rs b/svc/pkg/ds/ops/server-create/src/util_mm/test.rs new file mode 100644 index 000000000..8de746645 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/test.rs @@ -0,0 +1 @@ +pub const TIER_NAME_ID: &str = "basic-1d16"; diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/verification.rs b/svc/pkg/ds/ops/server-create/src/util_mm/verification.rs new file mode 100644 index 000000000..ae51d87f5 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/verification.rs @@ -0,0 +1,391 @@ +use std::collections::HashMap; + +use futures_util::{StreamExt, TryStreamExt}; +use http::StatusCode; +use proto::backend::{self, pkg::*}; +use rivet_operation::prelude::*; +use serde::Serialize; +use uuid::Uuid; + +#[derive(Serialize)] +pub struct ExternalVerificationRequest { + pub verification_data: Option, + pub game: Game, + pub clients: HashMap, + pub join_kind: JoinKind, + pub kind: ConnectionKind, +} + +#[derive(Serialize)] +pub struct Game { + pub namespace_id: Uuid, + pub game_mode_id: Uuid, + pub game_mode_name_id: String, + + pub lobby: Option, + pub state: Option, + pub config: Option, + pub tags: HashMap, + pub dynamic_max_players: Option, +} + +#[derive(Serialize)] +pub struct Lobby { + pub lobby_id: Uuid, + pub region_id: Uuid, + pub region_name_id: String, + pub create_ts: String, + pub is_closed: bool, +} + +#[derive(Serialize)] +pub struct Client { + pub user_agent: Option, + pub latitude: Option, + pub longitude: Option, +} + +#[derive(Serialize)] +pub enum JoinKind { + Normal, + Party, +} + +#[derive(Copy, Clone, Serialize)] +pub enum ConnectionKind { + Find, + Join, + Create, +} + +pub struct VerifyConfigOpts<'a> { + pub kind: ConnectionKind, + pub namespace_id: Uuid, + pub user_id: Option, + pub client_info: Vec, + pub tags: &'a HashMap, + pub dynamic_max_players: Option, + + pub lobby_groups: &'a [backend::matchmaker::LobbyGroup], + pub lobby_group_meta: &'a [backend::matchmaker::LobbyGroupMeta], + pub lobby_info: Option<&'a backend::matchmaker::Lobby>, + pub lobby_state_json: Option<&'a str>, + + pub verification_data_json: Option<&'a str>, + pub lobby_config_json: Option<&'a str>, + pub custom_lobby_publicity: Option, +} + +struct ExternalRequestConfigAndLobby<'a> { + pub lobby_group: &'a backend::matchmaker::LobbyGroup, + pub lobby_group_meta: &'a backend::matchmaker::LobbyGroupMeta, + external_request_config: backend::net::ExternalRequestConfig, +} + +/// Verifies everything required to make a find request or create a custom lobby. +pub async fn verify_config( + ctx: &OperationContext<()>, + opts: &VerifyConfigOpts<'_>, +) -> GlobalResult<()> { + let mut highest_identity_requirement = backend::matchmaker::IdentityRequirement::None; + let mut external_request_configs = Vec::new(); + + // Collect all external request configs and identity requirement + for (lobby_group, lobby_group_meta) in opts.lobby_groups.iter().zip(opts.lobby_group_meta) { + let (identity_requirement, external_request_config) = match ( + opts.kind, + lobby_group.actions.as_ref().and_then(|a| a.find.as_ref()), + lobby_group.actions.as_ref().and_then(|a| a.join.as_ref()), + lobby_group.actions.as_ref().and_then(|a| a.create.as_ref()), + ) { + (ConnectionKind::Find, Some(find_config), _, _) => { + if !find_config.enabled { + bail_with!(MATCHMAKER_FIND_DISABLED); + } + + ( + unwrap!( + backend::matchmaker::IdentityRequirement::from_i32( + find_config.identity_requirement + ), + "invalid identity requirement variant" + ), + find_config.verification.as_ref().map(|config| { + backend::net::ExternalRequestConfig { + url: config.url.clone(), + method: backend::net::HttpMethod::Post as i32, + headers: config.headers.clone(), + } + }), + ) + } + (ConnectionKind::Join, _, Some(join_config), _) => { + if !join_config.enabled { + bail_with!(MATCHMAKER_JOIN_DISABLED); + } + + ( + unwrap!( + backend::matchmaker::IdentityRequirement::from_i32( + join_config.identity_requirement + ), + "invalid identity requirement variant" + ), + join_config.verification.as_ref().map(|config| { + backend::net::ExternalRequestConfig { + url: config.url.clone(), + method: backend::net::HttpMethod::Post as i32, + headers: config.headers.clone(), + } + }), + ) + } + (ConnectionKind::Create, _, _, Some(create_config)) => { + let publicity = unwrap!(opts.custom_lobby_publicity); + + // Verify publicity + match ( + publicity, + create_config.enable_public, + create_config.enable_private, + ) { + (backend::matchmaker::lobby::Publicity::Public, allowed, _) => { + ensure_with!( + allowed, + MATCHMAKER_CUSTOM_LOBBY_CONFIG_INVALID, + reason = r#""public" publicity not allowed with this custom game mode"# + ); + } + (backend::matchmaker::lobby::Publicity::Private, _, allowed) => { + ensure_with!( + allowed, + MATCHMAKER_CUSTOM_LOBBY_CONFIG_INVALID, + reason = + r#""private" publicity not allowed with this custom game mode"# + ); + } + } + + // Verify lobby count + if let (Some(max_lobbies_per_identity), Some(user_id)) = + (create_config.max_lobbies_per_identity, opts.user_id) + { + let lobbies_res = op!([ctx] mm_lobby_list_for_user_id { + user_ids: vec![user_id.into()], + }) + .await?; + let user = unwrap!(lobbies_res.users.first()); + ensure_with!( + (user.lobby_ids.len() as u64) < max_lobbies_per_identity, + MATCHMAKER_CUSTOM_LOBBY_LIMIT_REACHED + ); + } + + ( + unwrap!( + backend::matchmaker::IdentityRequirement::from_i32( + create_config.identity_requirement + ), + "invalid identity requirement variant" + ), + create_config.verification.as_ref().map(|config| { + backend::net::ExternalRequestConfig { + url: config.url.clone(), + method: backend::net::HttpMethod::Post as i32, + headers: config.headers.clone(), + } + }), + ) + } + (ConnectionKind::Create, _, _, None) => { + bail_with!(MATCHMAKER_CUSTOM_LOBBIES_DISABLED); + } + _ => (backend::matchmaker::IdentityRequirement::None, None), + }; + + // Updated highest requirement + match highest_identity_requirement { + backend::matchmaker::IdentityRequirement::None => { + highest_identity_requirement = identity_requirement; + } + backend::matchmaker::IdentityRequirement::Guest => { + if matches!( + identity_requirement, + backend::matchmaker::IdentityRequirement::Registered + ) { + highest_identity_requirement = identity_requirement; + } + } + backend::matchmaker::IdentityRequirement::Registered => {} + } + + if let Some(external_request_config) = external_request_config { + external_request_configs.push(ExternalRequestConfigAndLobby { + lobby_group, + lobby_group_meta, + external_request_config, + }); + } + } + + // Verify identity requirement + match (highest_identity_requirement, opts.user_id) { + (backend::matchmaker::IdentityRequirement::Registered, Some(user_id)) => { + let user_identities_res = op!([ctx] user_identity_get { + user_ids: vec![user_id.into()], + }) + .await?; + let user = unwrap!( + user_identities_res.users.first(), + "could not find user identities" + ); + let is_registered = !user.identities.is_empty(); + + if !is_registered { + bail_with!(MATCHMAKER_REGISTRATION_REQUIRED); + } + } + ( + backend::matchmaker::IdentityRequirement::Guest + | backend::matchmaker::IdentityRequirement::Registered, + None, + ) => { + bail_with!(MATCHMAKER_IDENTITY_REQUIRED); + } + _ => {} + } + + // Verify lobby config + if let Some(lobby_config_json) = opts.lobby_config_json { + ensure_with!( + lobby_config_json.len() as u64 <= util::file_size::kibibytes(16), + MATCHMAKER_CUSTOM_LOBBY_CONFIG_INVALID, + reason = "too large (> 16KiB)" + ); + } + + // Verify user data externally + for external_request_config_and_lobby in external_request_configs { + let ExternalRequestConfigAndLobby { + lobby_group, + lobby_group_meta, + external_request_config, + } = external_request_config_and_lobby; + + // Build lobby info + let lobby = if let Some(l) = &opts.lobby_info { + // Fetch region data for readable name + let region_id = unwrap!(l.region_id); + let regions_res = op!([ctx] region_get { + region_ids: vec![region_id], + }) + .await?; + let region = unwrap!(regions_res.regions.first()); + + Some(Lobby { + lobby_id: unwrap_ref!(l.lobby_id).as_uuid(), + region_id: region_id.as_uuid(), + region_name_id: region.name_id.clone(), + create_ts: util::timestamp::to_string(l.create_ts)?, + is_closed: l.is_closed, + }) + } else { + None + }; + + // Fetch IP info + let clients = futures_util::stream::iter( + opts.client_info + .iter() + .filter_map(|client_info| { + client_info + .remote_address + .as_ref() + .map(|ip| (ip.clone(), client_info.user_agent.clone())) + }) + .collect::>(), + ) + .map(|(ip, user_agent)| async move { + let ip_res = op!([ctx] ip_info { + ip: ip.clone(), + }) + .await?; + let (latitude, longitude) = ip_res + .ip_info + .and_then(|ip_info| ip_info.coords) + .map(|coords| (coords.latitude, coords.longitude)) + .unzip(); + + GlobalResult::Ok(( + ip.clone(), + Client { + user_agent: user_agent.clone(), + longitude, + latitude, + }, + )) + }) + .buffer_unordered(16) + .try_collect::>() + .await?; + + // Build body + let body = ExternalVerificationRequest { + verification_data: opts + .verification_data_json + .as_ref() + .map(|json| serde_json::from_str::(json)) + .transpose()?, + game: Game { + game_mode_id: unwrap_ref!(lobby_group_meta.lobby_group_id).as_uuid(), + game_mode_name_id: lobby_group.name_id.clone(), + namespace_id: opts.namespace_id, + + lobby, + state: opts + .lobby_state_json + .as_ref() + .map(|json| serde_json::from_str::(json)) + .transpose()?, + config: opts + .lobby_config_json + .as_ref() + .map(|json| serde_json::from_str::(json)) + .transpose()?, + tags: opts.tags.clone(), + dynamic_max_players: opts.dynamic_max_players, + }, + clients, + join_kind: JoinKind::Normal, + kind: opts.kind, + }; + + // Send request + let request_id = Uuid::new_v4(); + let external_res = msg!([ctx] external::msg::request_call(request_id) + -> Result + { + request_id: Some(request_id.into()), + config: Some(external_request_config), + timeout: util::duration::seconds(10) as u64, + body: Some(serde_json::to_vec(&body)?), + ..Default::default() + }) + .await?; + + // Handle status code + if let Ok(res) = external_res { + let status = StatusCode::from_u16(res.status_code as u16)?; + + tracing::info!(?status, "user verification response"); + + if !status.is_success() { + bail_with!(MATCHMAKER_VERIFICATION_FAILED); + } + } else { + bail_with!(MATCHMAKER_VERIFICATION_REQUEST_FAILED); + } + } + + Ok(()) +} diff --git a/svc/pkg/ds/ops/server-create/src/util_mm/version_migrations.rs b/svc/pkg/ds/ops/server-create/src/util_mm/version_migrations.rs new file mode 100644 index 000000000..ed2064c68 --- /dev/null +++ b/svc/pkg/ds/ops/server-create/src/util_mm/version_migrations.rs @@ -0,0 +1,8 @@ +use bit_vec::BitVec; + +pub const PORT_RANGE_PROXY_IDX: usize = 0; + +/// The bit flags expected for a game version with all migrations applied. +pub fn all() -> BitVec { + BitVec::from_elem(1, true) +} diff --git a/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs b/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs index 429f76a3c..d162f1d31 100644 --- a/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs +++ b/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs @@ -23,8 +23,12 @@ async fn handle( .collect::>(); if query_ids.len() as isize == MAX_COUNT { - tracing::warn!("too many find queries, short circuiting to prevent bad things from happening"); - return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() }) + tracing::warn!( + "too many find queries, short circuiting to prevent bad things from happening" + ); + return Ok(mm::lobby_find_lobby_query_list::Response { + query_ids: Vec::new(), + }); } Ok(mm::lobby_find_lobby_query_list::Response { query_ids })