From d382606085d72068062dccf9111df5af30600b94 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 23 May 2025 23:22:17 +0000 Subject: [PATCH] fix(pb): get new actor ids working e2e --- .../src/ops/datacenter/get_for_label.rs | 2 +- .../install/install_scripts/components/mod.rs | 6 +- packages/edge/infra/client/echo/Cargo.toml | 4 +- packages/edge/infra/client/echo/src/main.rs | 159 ++++++++++-------- .../infra/client/manager/src/utils/mod.rs | 5 +- packages/edge/infra/guard/core/src/lib.rs | 1 - packages/edge/infra/guard/core/src/util.rs | 23 --- packages/edge/infra/guard/server/src/tls.rs | 13 +- packages/edge/services/pegboard/src/util.rs | 8 +- 9 files changed, 107 insertions(+), 114 deletions(-) delete mode 100644 packages/edge/infra/guard/core/src/util.rs diff --git a/packages/core/services/cluster/src/ops/datacenter/get_for_label.rs b/packages/core/services/cluster/src/ops/datacenter/get_for_label.rs index f3c894275d..267d107365 100644 --- a/packages/core/services/cluster/src/ops/datacenter/get_for_label.rs +++ b/packages/core/services/cluster/src/ops/datacenter/get_for_label.rs @@ -57,7 +57,7 @@ async fn get_dcs(ctx: OperationCtx, labels: Vec) -> GlobalResult>(), + labels.into_iter().map(|x| x.to_be_bytes()).collect::>(), ) .await?; diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs index b04439ef07..ae059946d8 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs @@ -233,7 +233,7 @@ pub mod lz4 { indoc!( r#" echo 'Downloading lz4' - curl -L https://releases.rivet.gg/tools/lz4/1.10.0/debian11-amd64/lz4 -o /usr/local/bin/lz4 + curl -Lfo /usr/local/bin/lz4 https://releases.rivet.gg/tools/lz4/1.10.0/debian11-amd64/lz4 chmod +x /usr/local/bin/lz4 "# ) @@ -286,7 +286,7 @@ pub mod umoci { indoc!( r#" echo 'Downloading umoci' - curl -Lf -o /usr/bin/umoci "https://github.com/opencontainers/umoci/releases/download/v0.4.7/umoci.amd64" + curl -Lfo /usr/bin/umoci "https://github.com/opencontainers/umoci/releases/download/v0.4.7/umoci.amd64" chmod +x /usr/bin/umoci "# ).to_string() @@ -300,7 +300,7 @@ pub mod cni { indoc!( r#" echo 'Downloading cnitool' - curl -Lf -o /usr/bin/cnitool "https://github.com/rivet-gg/cni/releases/download/v1.1.2-build3/cnitool" + curl -Lfo /usr/bin/cnitool "https://github.com/rivet-gg/cni/releases/download/v1.1.2-build3/cnitool" chmod +x /usr/bin/cnitool "# ).to_string() diff --git a/packages/edge/infra/client/echo/Cargo.toml b/packages/edge/infra/client/echo/Cargo.toml index 5527ff3f5f..09522fd2ec 100644 --- a/packages/edge/infra/client/echo/Cargo.toml +++ b/packages/edge/infra/client/echo/Cargo.toml @@ -6,11 +6,13 @@ authors = ["Rivet Gaming, LLC "] license = "Apache-2.0" [dependencies] +anyhow = "1.0" bytes = "1.0" futures-util = "0.3" http = "0.2" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.40", features = ["full",] } -tokio-tungstenite = "0.23.1" +tokio-util = "0.7" uuid = { version = "1", features = ["v4", "serde"] } warp = "0.3.7" diff --git a/packages/edge/infra/client/echo/src/main.rs b/packages/edge/infra/client/echo/src/main.rs index 7f628935cb..e3788241d3 100644 --- a/packages/edge/infra/client/echo/src/main.rs +++ b/packages/edge/infra/client/echo/src/main.rs @@ -1,10 +1,12 @@ -use std::{env, net::SocketAddr, sync::Arc, time::Duration}; +use std::{env, io::Cursor, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use anyhow::*; +use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; +use serde::{de::DeserializeOwned, Serialize}; use serde_json::json; -use tokio::sync::Mutex; -use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -use uuid::Uuid; +use tokio::{net::UnixStream, sync::Mutex}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; use warp::Filter; const PING_INTERVAL: Duration = Duration::from_secs(1); @@ -18,9 +20,9 @@ async fn main() { } // Get manager connection details from env vars - let manager_ip = env::var("RIVET_MANAGER_IP").expect("RIVET_MANAGER_IP not set"); - let manager_port = env::var("RIVET_MANAGER_PORT").expect("RIVET_MANAGER_PORT not set"); - let manager_addr = format!("ws://{}:{}", manager_ip, manager_port); + let manager_socket_path = PathBuf::from( + env::var("RIVET_MANAGER_SOCKET_PATH").expect("RIVET_MANAGER_SOCKET_PATH not set"), + ); // Get HTTP server port from env var or use default let http_port = env::var("PORT_MAIN") @@ -28,10 +30,10 @@ async fn main() { .parse::() .expect("bad PORT_MAIN"); - // Spawn the WebSocket client + // Spawn the unix socket client tokio::spawn(async move { - if let Err(e) = run_websocket_client(&manager_addr).await { - eprintln!("WebSocket client error: {}", e); + if let Err(e) = run_socket_client(manager_socket_path).await { + eprintln!("Socket client error: {}", e); } }); @@ -53,25 +55,28 @@ async fn main() { warp::serve(echo).run(http_addr).await; } -async fn run_websocket_client(url: &str) -> Result<(), Box> { - println!("Connecting to WebSocket at {}", url); +async fn run_socket_client(socket_path: PathBuf) -> Result<()> { + println!("Connecting to socket at {}", socket_path.display()); - // Connect to the WebSocket server - let (ws_stream, _) = connect_async(url).await?; - println!("WebSocket connection established"); + // Connect to the socket server + let stream = UnixStream::connect(socket_path).await?; + println!("Socket connection established"); - // Split the stream - let (mut write, mut read) = ws_stream.split(); + let codec = LengthDelimitedCodec::builder() + .length_field_type::() + .length_field_length(4) + // No offset + .length_field_offset(0) + // Header length is not included in the length calculation + .length_adjustment(4) + // header is included in the returned bytes + .num_skip(0) + .new_codec(); - let payload = json!({ - "init": { - "access_token": env::var("RIVET_ACCESS_TOKEN").expect("RIVET_ACCESS_TOKEN not set"), - }, - }); + let framed = Framed::new(stream, codec); - let data = serde_json::to_vec(&payload)?; - write.send(Message::Binary(data)).await?; - println!("Sent init message"); + // Split the stream + let (write, mut read) = framed.split(); // Ping thread let write = Arc::new(Mutex::new(write)); @@ -80,10 +85,14 @@ async fn run_websocket_client(url: &str) -> Result<(), Box Result<(), Box match msg { - Message::Pong(_) => {} - Message::Binary(buf) => { - let packet = serde_json::from_slice::(&buf)?; - println!("Received packet: {packet:?}"); - - if let Some(packet) = packet.get("start_actor") { - let payload = json!({ - "actor_state_update": { - "actor_id": packet["actor_id"], - "generation": packet["generation"], - "state": { - "running": null, - }, - }, - }); - - let data = serde_json::to_vec(&payload)?; - write.lock().await.send(Message::Binary(data)).await?; - } else if let Some(packet) = packet.get("signal_actor") { - let payload = json!({ - "actor_state_update": { - "actor_id": packet["actor_id"], - "generation": packet["generation"], - "state": { - "exited": { - "exit_code": null, - }, - }, - }, - }); - - let data = serde_json::to_vec(&payload)?; - write.lock().await.send(Message::Binary(data)).await?; - } - } - msg => eprintln!("Unexpected message: {msg:?}"), - }, - Err(e) => { - eprintln!("Error reading message: {}", e); - break; - } + while let Some(frame) = read.next().await.transpose()? { + let (_, packet) = decode_frame::(&frame.freeze())?; + println!("Received packet: {packet:?}"); + + if let Some(packet) = packet.get("start_actor") { + let payload = json!({ + "actor_state_update": { + "actor_id": packet["actor_id"], + "generation": packet["generation"], + "state": { + "running": null, + }, + }, + }); + + write.lock().await.send(encode_frame(&payload)?).await?; + } else if let Some(packet) = packet.get("signal_actor") { + let payload = json!({ + "actor_state_update": { + "actor_id": packet["actor_id"], + "generation": packet["generation"], + "state": { + "exited": { + "exit_code": null, + }, + }, + }, + }); + + write.lock().await.send(encode_frame(&payload)?).await?; } } - println!("WebSocket connection closed"); + println!("Socket connection closed"); Ok(()) } + +fn decode_frame(frame: &Bytes) -> Result<([u8; 4], T)> { + ensure!(frame.len() >= 4, "Frame too short"); + + // Extract the header (first 4 bytes) + let header = [frame[0], frame[1], frame[2], frame[3]]; + + // Deserialize the rest of the frame (payload after the header) + let payload = serde_json::from_slice(&frame[4..])?; + + Ok((header, payload)) +} + +fn encode_frame(payload: &T) -> Result { + let mut buf = Vec::with_capacity(4); + buf.extend_from_slice(&[0u8; 4]); // header (currently unused) + + let mut cursor = Cursor::new(&mut buf); + serde_json::to_writer(&mut cursor, payload)?; + + Ok(buf.into()) +} diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index 1aaa71f403..d7cf48ba81 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -128,6 +128,8 @@ async fn build_sqlite_pool(db_url: &str) -> Result { .busy_timeout(Duration::from_secs(5)) // Enable foreign key constraint enforcement .foreign_keys(true) + // Increases write performance + .journal_mode(SqliteJournalMode::Wal) // Enable auto vacuuming and set it to incremental mode for gradual space reclaiming .auto_vacuum(SqliteAutoVacuum::Incremental) // Set synchronous mode to NORMAL for performance and data safety balance @@ -241,7 +243,8 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { generation INTEGER NOT NULL, config BLOB NOT NULL, -- JSONB - runner_id NOT NULL, -- Already exists in `config`, set here for ease of querying + -- Already exists in `config`, set here for ease of querying + runner_id BLOB NOT NULL, -- UUID start_ts INTEGER NOT NULL, running_ts INTEGER, diff --git a/packages/edge/infra/guard/core/src/lib.rs b/packages/edge/infra/guard/core/src/lib.rs index a773ec3796..966b14d0ce 100644 --- a/packages/edge/infra/guard/core/src/lib.rs +++ b/packages/edge/infra/guard/core/src/lib.rs @@ -5,7 +5,6 @@ pub mod proxy_service; pub mod request_context; mod server; pub mod types; -pub mod util; pub use cert_resolver::CertResolverFn; pub use proxy_service::{MiddlewareFn, ProxyService, ProxyState, RouteTarget, RoutingFn}; diff --git a/packages/edge/infra/guard/core/src/util.rs b/packages/edge/infra/guard/core/src/util.rs deleted file mode 100644 index dc00c3d2e8..0000000000 --- a/packages/edge/infra/guard/core/src/util.rs +++ /dev/null @@ -1,23 +0,0 @@ -use global_error::prelude::*; - -use crate::types::{EndpointType, GameGuardProtocol}; - -/// Build a hostname or path string for the given endpoint type -pub fn build_actor_hostname_and_path( - endpoint_type: EndpointType, - actor_id: &rivet_util::Id, - port_name: &str, - guard_hostname: &str, - _protocol: GameGuardProtocol, -) -> GlobalResult { - match endpoint_type { - EndpointType::Hostname => { - // For hostname, we create: {actor_id}-{port_name}.{guard_hostname} - Ok(format!("{}-{}.{}", actor_id, port_name, guard_hostname)) - } - EndpointType::Path => { - // For path, we create: /{actor_id}-{port_name} - Ok(format!("/{}-{}", actor_id, port_name)) - } - } -} diff --git a/packages/edge/infra/guard/server/src/tls.rs b/packages/edge/infra/guard/server/src/tls.rs index fa11f1c3a2..69f6748684 100644 --- a/packages/edge/infra/guard/server/src/tls.rs +++ b/packages/edge/infra/guard/server/src/tls.rs @@ -157,14 +157,11 @@ pub async fn create_cert_resolver( } Ok(None) => { tracing::warn!( - "Could not build dynamic hostname actor routing regex - pattern will be skipped" - ); + "Could not build dynamic hostname actor routing regex - pattern will be skipped" + ); None } - Err(err) => bail!( - "Failed to build dynamic hostname actor routing regex: {}", - err - ), + Err(e) => bail!("Failed to build dynamic hostname actor routing regex: {}", e), }; let actor_hostname_regex_static = match build_actor_hostname_and_path_regex(EndpointType::Path, guard_hostname) { @@ -178,9 +175,7 @@ pub async fn create_cert_resolver( ); None } - Err(e) => { - bail!("Failed to build static path actor routing regex: {}", e); - } + Err(e) => bail!("Failed to build static path actor routing regex: {}", e), }; // Create resolver function that matches the routing logic diff --git a/packages/edge/services/pegboard/src/util.rs b/packages/edge/services/pegboard/src/util.rs index 44255390ad..31c2bc4c95 100644 --- a/packages/edge/services/pegboard/src/util.rs +++ b/packages/edge/services/pegboard/src/util.rs @@ -5,7 +5,7 @@ use regex::Regex; use crate::types::{EndpointType, GameGuardProtocol}; // Constants for regex patterns -const UUID_PATTERN: &str = r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; +const ID_PATTERN: &str = r"[a-zA-Z0-9-]+"; const PORT_NAME_PATTERN: &str = r"[a-zA-Z0-9-_]+"; pub fn build_actor_hostname_and_path( @@ -59,7 +59,7 @@ pub fn build_actor_hostname_and_path_regex( // server in the subdomain is a convenience (EndpointType::Hostname, GuardPublicHostname::DnsParent(dns_parent)) => { let hostname_regex = Regex::new(&format!( - r"^(?P{UUID_PATTERN})-(?P{PORT_NAME_PATTERN})\.actor\.{}$", + r"^(?P{ID_PATTERN})-(?P{PORT_NAME_PATTERN})\.actor\.{}$", regex::escape(dns_parent.as_str()) ))?; Ok(Some((hostname_regex, None))) @@ -81,7 +81,7 @@ pub fn build_actor_hostname_and_path_regex( ))?; let path_regex = Regex::new(&format!( - r"^/(?P{UUID_PATTERN})-(?P{PORT_NAME_PATTERN})(?:/.*)?$" + r"^/(?P{ID_PATTERN})-(?P{PORT_NAME_PATTERN})(?:/.*)?$" ))?; Ok(Some((hostname_regex, Some(path_regex)))) @@ -91,7 +91,7 @@ pub fn build_actor_hostname_and_path_regex( let hostname_regex = Regex::new(&format!(r"^{}$", regex::escape(static_.as_str())))?; let path_regex = Regex::new(&format!( - r"^/(?P{UUID_PATTERN})-(?P{PORT_NAME_PATTERN})(?:/.*)?$" + r"^/(?P{ID_PATTERN})-(?P{PORT_NAME_PATTERN})(?:/.*)?$" ))?; Ok(Some((hostname_regex, Some(path_regex))))