diff --git a/Cargo.lock b/Cargo.lock index 53adbdee20..f2902f9c71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,17 +39,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "aes" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", -] - [[package]] name = "ahash" version = "0.7.8" @@ -119,7 +108,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "220044e6a1bb31ddee4e3db724d29767f352de47445a6cd75e1a173142136c83" dependencies = [ "nom", - "vte", + "vte 0.10.1", ] [[package]] @@ -1325,6 +1314,18 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-lock" version = "3.4.0" @@ -2126,6 +2127,56 @@ dependencies = [ "cipher", ] +[[package]] +name = "bollard" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.1.0", + "http-body-util", + "hyper 1.6.0", + "hyper-named-pipe", + "hyper-rustls 0.27.3", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "rustls 0.23.25", + "rustls-native-certs 0.8.1", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror 2.0.12", + "tokio", + "tokio-util 0.7.12", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.47.1-rc.27.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da" +dependencies = [ + "serde", + "serde_repr", + "serde_with 3.12.0", +] + [[package]] name = "bstr" version = "1.11.0" @@ -4297,6 +4348,17 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "docker_credential" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -6718,28 +6780,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.7", - "pem 1.1.1", + "pem", "ring 0.16.20", "serde", "serde_json", "simple_asn1", ] -[[package]] -name = "jsonwebtoken" -version = "9.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" -dependencies = [ - "base64 0.22.1", - "js-sys", - "pem 3.0.5", - "ring 0.17.8", - "serde", - "serde_json", - "simple_asn1", -] - [[package]] name = "kqueue" version = "1.0.8" @@ -7118,16 +7165,6 @@ dependencies = [ "twox-hash", ] -[[package]] -name = "lzma-rs" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297e814c836ae64db86b36cf2a557ba54368d03f6afcd7d947c266692f71115e" -dependencies = [ - "byteorder", - "crc", -] - [[package]] name = "maplit" version = "1.0.2" @@ -7881,17 +7918,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "libc", -] - [[package]] name = "nix" version = "0.29.0" @@ -7912,7 +7938,7 @@ checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ "bitflags 2.6.0", "cfg-if", - "cfg_aliases 0.2.1", + "cfg_aliases", "libc", ] @@ -8558,6 +8584,31 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax 0.8.5", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.8.5", + "structmeta", + "syn 2.0.90", +] + [[package]] name = "paste" version = "1.0.15" @@ -8570,16 +8621,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" -[[package]] -name = "pbkdf2" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" -dependencies = [ - "digest 0.10.7", - "hmac 0.12.1", -] - [[package]] name = "pegboard" version = "25.5.2" @@ -8649,11 +8690,14 @@ dependencies = [ name = "pegboard-config" version = "25.5.2" dependencies = [ + "anyhow", "ipnet", "pegboard", "rivet-util", "schemars", "serde", + "serde_json", + "tokio-util 0.7.12", "url", "uuid", ] @@ -8666,6 +8710,7 @@ dependencies = [ "nix 0.30.1", "portpicker", "rivet-logs", + "rivet-util", "serde", "serde_json", "signal-hook", @@ -8677,12 +8722,14 @@ dependencies = [ name = "pegboard-echo-server" version = "0.0.1" dependencies = [ + "anyhow", "bytes", "futures-util", "http 0.2.12", + "serde", "serde_json", "tokio", - "tokio-tungstenite 0.23.1", + "tokio-util 0.7.12", "uuid", "warp", ] @@ -8692,13 +8739,11 @@ name = "pegboard-manager" version = "25.5.2" dependencies = [ "anyhow", - "base64 0.22.1", "bytes", "futures-util", "hyper 0.14.31", "indoc 2.0.5", "json5", - "jsonwebtoken 9.3.1", "lazy_static", "nix 0.30.1", "notify", @@ -8709,7 +8754,6 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.1", "reqwest 0.12.12", - "ring 0.17.8", "rivet-logs", "rivet-util", "serde", @@ -8780,16 +8824,6 @@ dependencies = [ "base64 0.13.1", ] -[[package]] -name = "pem" -version = "3.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" -dependencies = [ - "base64 0.22.1", - "serde", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -10002,7 +10036,7 @@ version = "25.5.2" dependencies = [ "base64 0.13.1", "global-error", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "lazy_static", "prost 0.10.4", "rivet-util", @@ -10831,7 +10865,6 @@ dependencies = [ "async-trait", "bcrypt", "chrono", - "fdb-util", "formatted-error", "futures-util", "global-error", @@ -10843,6 +10876,7 @@ dependencies = [ "reqwest 0.12.12", "rivet-config", "rivet-metrics", + "rivet-util-id", "rivet-util-macros", "serde", "serde_json", @@ -10874,6 +10908,17 @@ dependencies = [ name = "rivet-util-cdn" version = "25.5.2" +[[package]] +name = "rivet-util-id" +version = "25.4.2" +dependencies = [ + "fdb-util", + "serde", + "sqlx", + "thiserror 1.0.69", + "uuid", +] + [[package]] name = "rivet-util-job" version = "25.5.2" @@ -11077,6 +11122,18 @@ dependencies = [ "security-framework 2.11.1", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.2.0", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -11753,12 +11810,6 @@ dependencies = [ "outref 0.1.0", ] -[[package]] -name = "simd-adler32" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" - [[package]] name = "simple_asn1" version = "0.6.2" @@ -12732,6 +12783,35 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "testcontainers" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23bb7577dca13ad86a78e8271ef5d322f37229ec83b8d98da6d996c588a1ddb1" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "docker_credential", + "either", + "etcetera 0.10.0", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "serde", + "serde_json", + "serde_with 3.12.0", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-tar", + "tokio-util 0.7.12", + "url", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -12870,7 +12950,7 @@ dependencies = [ "chirp-client", "chirp-worker", "chrono", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "lazy_static", "prost 0.10.4", "rivet-claims", @@ -14952,20 +15032,6 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.90", -] [[package]] name = "zerovec" diff --git a/docker/dev-full/rivet-edge-server/config.jsonc b/docker/dev-full/rivet-edge-server/config.jsonc index 1619be1563..41338cdc69 100644 --- a/docker/dev-full/rivet-edge-server/config.jsonc +++ b/docker/dev-full/rivet-edge-server/config.jsonc @@ -8,7 +8,7 @@ "cluster_id": "11ca8960-acab-4963-909c-99d72af3e1cb", "datacenter_id": "f288913c-735d-4188-bf9b-2fcf6eac7b9c", "server_id": "174aca2a-98b7-462c-9ad9-3835094a9a10", - "intercom_endpoint": "http://rivet-server:6421" + "intercom_address": "http://rivet-server:6421" }, "guard": { // TLS not configured for local development diff --git a/docker/dev-full/rivet-guard/config.jsonc b/docker/dev-full/rivet-guard/config.jsonc index 7e56b42f5d..e16e619a82 100644 --- a/docker/dev-full/rivet-guard/config.jsonc +++ b/docker/dev-full/rivet-guard/config.jsonc @@ -20,7 +20,7 @@ "cluster_id": "11ca8960-acab-4963-909c-99d72af3e1cb", "datacenter_id": "f288913c-735d-4188-bf9b-2fcf6eac7b9c", "server_id": "174aca2a-98b7-462c-9ad9-3835094a9a10", - "intercom_endpoint": "http://rivet-server:6421" + "intercom_address": "http://rivet-server:6421" }, "guard": { // TLS not configured for local development diff --git a/docker/universal/Dockerfile b/docker/universal/Dockerfile index 7e542bf207..290fefaf45 100644 --- a/docker/universal/Dockerfile +++ b/docker/universal/Dockerfile @@ -67,7 +67,9 @@ RUN apt-get update -y && \ redis-tools \ postgresql-client \ gpg \ - dirmngr && \ + dirmngr \ + gcc \ + wget && \ curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg && \ echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb stable main" | tee /etc/apt/sources.list.d/clickhouse.list && \ apt-get update -y && \ @@ -76,7 +78,13 @@ RUN apt-get update -y && \ mv migrate /usr/local/bin/migrate && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* && \ - curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so" + curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so" && \ + # Install go and usql + wget https://go.dev/dl/go1.24.4.linux-amd64.tar.gz && \ + tar -C /usr/local -xzf go1.24.4.linux-amd64.tar.gz && \ + export PATH=$PATH:/usr/local/go/bin && \ + export PATH="$PATH:$(go env GOPATH)/bin" && \ + go install github.com/xo/usql@latest # MARK: Server (Full) FROM server-full-base AS server-full diff --git a/examples/system-test-actor/tests/client.ts b/examples/system-test-actor/tests/client.ts index 68773ed22e..220bb7e647 100644 --- a/examples/system-test-actor/tests/client.ts +++ b/examples/system-test-actor/tests/client.ts @@ -44,13 +44,13 @@ async function run() { guard: {}, }, }, - udp: { - protocol: "udp", - // internalPort: 80, - routing: { - host: {}, - }, - }, + // udp: { + // protocol: "udp", + // // internalPort: 80, + // routing: { + // host: {}, + // }, + // }, }, }, runtime: { diff --git a/package.json b/package.json index 0c305673cc..2fc86da6a5 100644 --- a/package.json +++ b/package.json @@ -30,4 +30,4 @@ "esbuild": "^0.25.5", "actor-core": "file:./frontend/packages/actor-core.tgz" } -} \ No newline at end of file +} diff --git a/packages/common/config/src/config/server/rivet/mod.rs b/packages/common/config/src/config/server/rivet/mod.rs index 1a413834f9..d67f30e389 100644 --- a/packages/common/config/src/config/server/rivet/mod.rs +++ b/packages/common/config/src/config/server/rivet/mod.rs @@ -843,7 +843,8 @@ pub struct Edge { pub cluster_id: Uuid, pub datacenter_id: Uuid, pub server_id: Uuid, - pub intercom_endpoint: Url, + /// Url of the core cluster. + pub intercom_address: Url, /// This API address will be used if there are no worker servers listed in the cluster package #[serde(default)] pub api_lan_address: Option<(String, u16)>, diff --git a/packages/common/util/id/src/lib.rs b/packages/common/util/id/src/lib.rs index a0a9b5e17c..3abef043a7 100644 --- a/packages/common/util/id/src/lib.rs +++ b/packages/common/util/id/src/lib.rs @@ -36,8 +36,6 @@ pub enum Id { V1([u8; 18]), } - - impl Id { /// Construct V0 from uuid. pub fn new_v0() -> Self { @@ -241,20 +239,41 @@ impl TuplePack for Id { w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { + let mut size = 1; + w.write_all(&[fdb_util::codes::ID])?; + + // IMPORTANT: While the normal bytes representation of a v0 ID doesn't include the version, we write + // it here so that we can unpack without a terminating NIL. + if let Id::V0(_) = self { + w.write_all(&[0])?; + size += 1; + } + let bytes = self.as_bytes(); + let len = u32::try_from(bytes.len()) .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + size += len; + w.write_all(&bytes)?; - Ok(VersionstampOffset::None { size: 1 + len }) + Ok(VersionstampOffset::None { size }) } } impl<'de> TupleUnpack<'de> for Id { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { let input = fdb_util::parse_code(input, fdb_util::codes::ID)?; - let (input, slice) = fdb_util::parse_bytes(input, 16)?; + let (input2, version) = fdb_util::parse_byte(input)?; + + let (input, slice) = if version == 0 { + // Parse 16 bytes after version + fdb_util::parse_bytes(input2, 16)? + } else { + // Parse 19 bytes including version + fdb_util::parse_bytes(input, 19)? + }; let v = Id::from_bytes(slice) .map_err(|err| PackError::Message(format!("bad id format: {err}").into()))?; diff --git a/packages/core/infra/server/src/run_config.rs b/packages/core/infra/server/src/run_config.rs index 24520df06f..f713eb69c7 100644 --- a/packages/core/infra/server/src/run_config.rs +++ b/packages/core/infra/server/src/run_config.rs @@ -227,6 +227,13 @@ pub fn config(rivet_config: rivet_config::Config) -> Result { ), db_name: "db_pegboard_runner_log", }, + SqlService { + kind: SqlServiceKind::ClickHouse, + migrations: include_dir!( + "$CARGO_MANIFEST_DIR/../../../edge/services/pegboard/db/runner" + ), + db_name: "db_pegboard_runner", + }, SqlService { kind: SqlServiceKind::CockroachDB, migrations: include_dir!( diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/guard.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/guard.rs index e6bef8ea9b..cfe0a13343 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/guard.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/guard.rs @@ -58,7 +58,7 @@ pub fn configure(config: &rivet_config::Config) -> GlobalResult { datacenter_id: Uuid::nil(), server_id: Uuid::nil(), api_lan_address: None, - intercom_endpoint: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?, + intercom_address: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?, redirect_logs_dir: Some(PathBuf::from("/var/log/rivet-guard")), }), status: server_config.rivet.status.clone(), diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/worker.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/worker.rs index acdaf30566..b21277f169 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/worker.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/rivet/worker.rs @@ -72,7 +72,7 @@ pub fn configure(config: &rivet_config::Config) -> GlobalResult { datacenter_id: Uuid::nil(), server_id: Uuid::nil(), api_lan_address: None, - intercom_endpoint: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?, + intercom_address: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?, redirect_logs_dir: Some(PathBuf::from("/var/log/rivet-edge-server")), }), ..Default::default() diff --git a/packages/edge/infra/client/container-runner/src/log_shipper.rs b/packages/edge/infra/client/container-runner/src/log_shipper.rs index 7701d0f596..8dabafbd39 100644 --- a/packages/edge/infra/client/container-runner/src/log_shipper.rs +++ b/packages/edge/infra/client/container-runner/src/log_shipper.rs @@ -38,7 +38,6 @@ pub struct LogShipper { pub vector_socket_addr: String, pub runner_id: String, - pub actor_id: Option, pub env_id: Uuid, } @@ -93,39 +92,8 @@ impl LogShipper { println!("Log shipper connected"); while let Result::Ok(message) = self.msg_rx.recv() { - // // If actor id is not provided, extract from logs - // let actor_id = if self.actor_id.is_some() { - // self.actor_id.as_deref() - // } else { - // if let Some(start_idx) = message.message.find("actor_") { - // let start_idx = start_idx + 6; - - // // Look for next non alphanum (end of actor id) - // let end_idx = if let Some(end_idx) = - // message.message[start_idx..].find(|c: char| !c.is_ascii_alphanumeric()) - // { - // start_idx + end_idx - // } else { - // message.message.len() - // }; - - // let actor_id = &message.message[start_idx..end_idx]; - - // // Check if valid id - // rivet_util::Id::parse(actor_id).is_ok().then_some(actor_id) - // } else { - // None - // } - // }; - - // // Cannot determine actor id, ignore log - // let Some(actor_id) = actor_id else { - // continue; - // }; - - let vector_message = VectorMessage::Actors { + let vector_message = VectorMessage::Runners { runner_id: self.runner_id.as_str(), - actor_id: self.actor_id.as_deref(), env_id: self.env_id, stream_type: message.stream_type as u8, ts: message.ts, @@ -146,10 +114,9 @@ impl LogShipper { #[derive(Serialize)] #[serde(tag = "source")] enum VectorMessage<'a> { - #[serde(rename = "actors")] - Actors { + #[serde(rename = "runners")] + Runners { runner_id: &'a str, - actor_id: Option<&'a str>, env_id: Uuid, stream_type: u8, ts: u64, diff --git a/packages/edge/infra/client/container-runner/src/main.rs b/packages/edge/infra/client/container-runner/src/main.rs index 20a66a5d7c..8f6eb3906f 100644 --- a/packages/edge/infra/client/container-runner/src/main.rs +++ b/packages/edge/infra/client/container-runner/src/main.rs @@ -13,8 +13,8 @@ mod utils; const MAX_LINE_BYTES: usize = 1024; /// Maximum number of bytes to buffer before dropping logs const MAX_BUFFER_BYTES: usize = 1024 * 1024; -// 7 day logs retention -const LOGS_RETENTION: Duration = Duration::from_secs(7 * 24 * 60 * 60); +// 1 day logs retention +const LOGS_RETENTION: Duration = Duration::from_secs(1 * 24 * 60 * 60); fn main() -> Result<()> { let mut args = std::env::args().skip(1); @@ -37,8 +37,6 @@ fn main() -> Result<()> { .transpose() .context("failed to parse vector socket addr")?; let runner_id = var("RUNNER_ID")?; - // Only set if this is a single allocation runner (one actor running on it) - let actor_id = var("ACTOR_ID").ok(); let env_id = Uuid::parse_str(&var("ENVIRONMENT_ID")?)?; println!("Starting runner_id={runner_id} env_id={env_id} vector_socket_addr={} root_user_enabled={root_user_enabled}", vector_socket_addr.as_ref().map(|x| x.as_str()).unwrap_or("?")); @@ -53,7 +51,6 @@ fn main() -> Result<()> { msg_rx, vector_socket_addr, runner_id, - actor_id, env_id, }; let log_shipper_thread = log_shipper.spawn(); diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index fa48d3d057..f8fcac6d84 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -121,15 +121,9 @@ impl Actor { .context("should have runner config")? { protocol::ActorRunner::New { .. } => { - let actor_id = matches!( - self.runner.config().image.allocation_type, - protocol::ImageAllocationType::Single - ) - .then_some(self.actor_id); - // Because the runner is not already started we can get the ports here instead of reading from // sqlite - let ports = self.runner.start(ctx, actor_id).await?; + let ports = self.runner.start(ctx).await?; let pid = self.runner.pid().await?; diff --git a/packages/edge/infra/client/manager/src/main.rs b/packages/edge/infra/client/manager/src/main.rs index 38dcf6277a..cb9754f95b 100644 --- a/packages/edge/infra/client/manager/src/main.rs +++ b/packages/edge/infra/client/manager/src/main.rs @@ -190,7 +190,7 @@ async fn run(init: Init, first: bool) -> Result<()> { let ctx = Ctx::new(init.config, init.system, init.pool, tx); tokio::try_join!( - async { metrics_task.await.map_err(Into::::into) }, + async { metrics_task.await.map_err(Into::::into)? }, ctx.run(rx), )?; diff --git a/packages/edge/infra/client/manager/src/runner/mod.rs b/packages/edge/infra/client/manager/src/runner/mod.rs index 827f05f950..5257a1952c 100644 --- a/packages/edge/infra/client/manager/src/runner/mod.rs +++ b/packages/edge/infra/client/manager/src/runner/mod.rs @@ -251,12 +251,9 @@ impl Runner { Ok(()) } - // `actor_id` is set if this runner has a single allocation type which means there is only one actor - // runner on it pub async fn start( self: &Arc, ctx: &Arc, - actor_id: Option, ) -> Result> { tracing::info!(runner_id=?self.runner_id, "starting"); @@ -311,7 +308,7 @@ impl Runner { let self2 = self.clone(); let ctx2 = ctx.clone(); tokio::spawn(async move { - match self2.run(&ctx2, actor_id).await { + match self2.run(&ctx2).await { Ok(_) => { if let Err(err) = self2.observe(&ctx2, false).await { tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed"); @@ -331,7 +328,7 @@ impl Runner { Ok(proxied_ports) } - async fn run(&self, ctx: &Ctx, actor_id: Option) -> Result<()> { + async fn run(&self, ctx: &Ctx) -> Result<()> { // NOTE: This is the env that goes to the container-runner process, NOT the env that is inserted into // the container. let mut runner_env = vec![ @@ -351,10 +348,6 @@ impl Runner { ), ]; - if let Some(actor_id) = actor_id { - runner_env.push(("ACTOR_ID", actor_id.to_string())); - } - if let Some(vector) = &ctx.config().vector { runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string())); } diff --git a/packages/edge/infra/client/manager/tests/vector.json b/packages/edge/infra/client/manager/tests/vector.json index 2bf90d83b6..730cbc0cf8 100644 --- a/packages/edge/infra/client/manager/tests/vector.json +++ b/packages/edge/infra/client/manager/tests/vector.json @@ -18,7 +18,7 @@ } }, "transforms": { - "actors": { + "runners": { "type": "filter", "inputs": [ "vector", @@ -26,19 +26,19 @@ ], "condition": { "type": "vrl", - "source": ".source == \"actors\"" + "source": ".source == \"runners\"" } }, "add_prefix": { "type": "remap", "inputs": [ - "actors" + "runners" ], "source": ".message, err = \"\u001b[2m\" + \"runner_id=\" + .runner_id + \"\u001b[0m \" + .message" } }, "sinks": { - "actor_logs": { + "runner_logs": { "type": "console", "inputs": [ "add_prefix" diff --git a/packages/edge/infra/guard/server/src/routing/api.rs b/packages/edge/infra/guard/server/src/routing/api.rs index 4b07e2d10e..2fb3ffc60e 100644 --- a/packages/edge/infra/guard/server/src/routing/api.rs +++ b/packages/edge/infra/guard/server/src/routing/api.rs @@ -10,9 +10,6 @@ use service_discovery::ServiceDiscovery; use url::Url; use uuid::Uuid; -// TODO: Copied from cluster/src/workflows/server/install/install_scripts/components/rivet/mod.rs -const TUNNEL_API_EDGE_PORT: u16 = 5010; - /// Route requests to the API service #[tracing::instrument(skip_all)] pub async fn route_api_request( @@ -49,7 +46,7 @@ pub async fn route_api_request( let edge = ctx.config().server()?.rivet.edge()?; let url = Url::parse(&format!( "{}provision/datacenters/{dc_id}/servers?pools=worker", - edge.intercom_endpoint + edge.intercom_address ))?; let sd = ServiceDiscovery::new(url); let servers = sd.fetch().await?; diff --git a/packages/edge/services/pegboard/db/analytics/migrations/20250619144812_add_runner_id.down.sql b/packages/edge/services/pegboard/db/analytics/migrations/20250619144812_add_runner_id.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/edge/services/pegboard/db/analytics/migrations/20250619144812_add_runner_id.up.sql b/packages/edge/services/pegboard/db/analytics/migrations/20250619144812_add_runner_id.up.sql new file mode 100644 index 0000000000..c14763842c --- /dev/null +++ b/packages/edge/services/pegboard/db/analytics/migrations/20250619144812_add_runner_id.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE actors +ADD COLUMN runner_id UUID; diff --git a/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql b/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql index f9b2d5dd6a..222f78c8d2 100644 --- a/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql +++ b/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS runner_logs ( + namespace LowCardinality(String), runner_id UUID, - actor_id String, stream_type UInt8, -- pegboard::types::LogsStreamType ts DateTime64 (9), message String @@ -8,9 +8,10 @@ CREATE TABLE IF NOT EXISTS runner_logs ( PARTITION BY toStartOfHour (ts) ORDER BY ( + namespace, runner_id, toUnixTimestamp (ts), stream_type ) -TTL toDate (ts + toIntervalDay (3)) +TTL toDate (ts + toIntervalDay (14)) SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.down.sql b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql new file mode 100644 index 0000000000..342ed6c2a0 --- /dev/null +++ b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS actor_runners ( + actor_id UUID, + generation UInt32, + runner_id UUID, + started_at DateTime64 (9), + finished_at DateTime64 (9) +) ENGINE = ReplicatedMergeTree () +PARTITION BY + toStartOfHour (started_at) +ORDER BY ( + actor_id, + generation, + runner_id, + toUnixTimestamp (started_at), + toUnixTimestamp (finished_at) +) +TTL toDate (started_at + toIntervalDay (30)) +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/packages/edge/services/pegboard/src/keys/client.rs b/packages/edge/services/pegboard/src/keys/client.rs index 60cc6c09e8..34bf77d62f 100644 --- a/packages/edge/services/pegboard/src/keys/client.rs +++ b/packages/edge/services/pegboard/src/keys/client.rs @@ -346,8 +346,8 @@ impl Actor2Key { Actor2SubspaceKey::new(client_id) } - pub fn entire_subspace() -> ActorSubspaceKey { - ActorSubspaceKey::entire() + pub fn entire_subspace() -> Actor2SubspaceKey { + Actor2SubspaceKey::entire() } } diff --git a/packages/edge/services/pegboard/src/workflows/actor/analytics.rs b/packages/edge/services/pegboard/src/workflows/actor/analytics.rs index 3d3f29e197..708fea1dd0 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/analytics.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/analytics.rs @@ -94,7 +94,6 @@ struct StateRow { selected_resources_cpu_millicores: Option, selected_resources_memory_mib: Option, client_id: Option, - client_workflow_id: Option, client_wan_hostname: Option, lifecycle_kill_timeout_ms: i64, lifecycle_durable: bool, @@ -142,7 +141,6 @@ pub async fn insert_clickhouse( selected_resources_cpu_millicores, selected_resources_memory_mib, client_id, - client_workflow_id, client_wan_hostname, lifecycle_kill_timeout_ms, lifecycle_durable, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs b/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs index 36c684c82d..00ed54cc5f 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs @@ -25,6 +25,7 @@ pub struct ActorClickHouseRow { network_ports_proxied: HashMap, client_id: Uuid, client_wan_hostname: String, + runner_id: Uuid, selected_cpu_millicores: u32, selected_memory_mib: u32, root_user_enabled: bool, @@ -94,8 +95,8 @@ struct StateRow { selected_resources_cpu_millicores: Option, selected_resources_memory_mib: Option, client_id: Option, - client_workflow_id: Option, client_wan_hostname: Option, + runner_id: Option, lifecycle_kill_timeout_ms: i64, lifecycle_durable: bool, create_ts: i64, @@ -142,8 +143,8 @@ pub async fn insert_clickhouse( selected_resources_cpu_millicores, selected_resources_memory_mib, client_id, - client_workflow_id, client_wan_hostname, + runner_id, lifecycle_kill_timeout_ms, lifecycle_durable, create_ts, @@ -271,6 +272,7 @@ pub async fn insert_clickhouse( network_ports_proxied: proxied_ports, client_id: state_row.client_id.unwrap_or_default(), client_wan_hostname: state_row.client_wan_hostname.unwrap_or_default(), + runner_id: state_row.runner_id.unwrap_or_default(), selected_cpu_millicores: state_row .selected_resources_cpu_millicores .unwrap_or_default() as u32, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs b/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs index 50f8a839fe..2923d488e4 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs @@ -20,9 +20,12 @@ async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalRes [ctx, @tx &mut tx] " CREATE TABLE state ( + -- Updated later + project_id BLOB NOT NULL DEFAULT X'00000000000000000000000000000000', -- UUID + env_id BLOB NOT NULL, -- UUID tags BLOB NOT NULL, -- JSONB, map - + resources_cpu_millicores INT, resources_memory_mib INT, @@ -30,6 +33,7 @@ async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalRes selected_resources_cpu_millicores INT, selected_resources_memory_mib INT, + old_runner_id BLOB, -- UUID runner_id BLOB, -- UUID client_id BLOB, -- UUID client_workflow_id BLOB, -- UUID @@ -47,7 +51,12 @@ async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalRes image_id BLOB NOT NULL, -- UUID args BLOB NOT NULL, -- JSONB, list network_mode INT NOT NULL, -- pegboard::types::NetworkMode - environment BLOB NOT NULL -- JSONB, map + environment BLOB NOT NULL, -- JSONB, map + + -- Updated later + root_user_enabled INT NOT NULL DEFAULT false, + build_kind INT NOT NULL DEFAULT -1, + build_compression INT NOT NULL DEFAULT -1 ) STRICT; CREATE TABLE ports_ingress ( diff --git a/packages/edge/services/pegboard/src/workflows/actor2/mod.rs b/packages/edge/services/pegboard/src/workflows/actor2/mod.rs index 9c15240156..1eb00861bd 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/mod.rs @@ -259,7 +259,11 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu protocol::ActorState::Starting => { state.gc_timeout_ts = None; - ctx.activity(runtime::SetStartedInput {}).await?; + ctx.activity(runtime::SetStartedInput { + actor_id: input.actor_id, + generation: state.generation, + }) + .await?; } protocol::ActorState::Running { ports, .. } => { ctx.join(( diff --git a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs index 29e6040493..f295065192 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs @@ -92,19 +92,20 @@ async fn update_client_and_runner( .await?; sql_execute!( - [ctx, pool] + [ctx, &pool] " UPDATE state SET client_id = ?1, client_workflow_id = ?2, - runner_id = ?3, - client_wan_hostname = ?4 + client_wan_hostname = ?3, + runner_id = ?4, + old_runner_id = runner_id ", input.client_id, input.client_workflow_id, - input.runner_id, &client_wan_hostname, + input.runner_id, ) .await?; @@ -576,27 +577,76 @@ pub async fn update_image(ctx: &ActivityCtx, input: &UpdateImageInput) -> Global } #[derive(Debug, Serialize, Deserialize, Hash)] -pub struct SetStartedInput {} +pub struct SetStartedInput { + pub actor_id: util::Id, + pub generation: u32, +} + +#[derive(Serialize)] +pub struct ActorRunnerClickhouseRow { + actor_id: String, + generation: u32, + runner_id: Uuid, + started_at: i64, + finished_at: i64, +} #[activity(SetStarted)] pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> GlobalResult<()> { let pool = ctx.sqlite().await?; let start_ts = util::timestamp::now(); - let row = sql_fetch_optional!( - [ctx, (i64,), pool] + let (create_ts, old_start_ts, runner_id, old_runner_id) = sql_fetch_one!( + [ctx, (i64, Option, Uuid, Option), &pool] " - UPDATE state - SET start_ts = ? - WHERE start_ts IS NULL - RETURNING create_ts + SELECT create_ts, start_ts, runner_id, old_runner_id + FROM state + ", + start_ts, + ) + .await?; + + sql_execute!( + [ctx, &pool] + " + UPDATE state SET start_ts = ?1 ", start_ts, ) .await?; - // Add start duration if this is the first start - if let Some((create_ts,)) = row { + let inserter = ctx.clickhouse_inserter().await?; + + // Set old alloc as finished + if let (Some(old_start_ts), Some(old_runner_id)) = (old_start_ts, old_runner_id) { + inserter.insert( + "db_pegboard_runner", + "actor_runners", + ActorRunnerClickhouseRow { + actor_id: input.actor_id.to_string(), + generation: input.generation, + runner_id: old_runner_id, + started_at: old_start_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + finished_at: start_ts * 1_000_000, + }, + )?; + } + + // Insert new alloc + inserter.insert( + "db_pegboard_runner", + "actor_runners", + ActorRunnerClickhouseRow { + actor_id: input.actor_id.to_string(), + generation: input.generation, + runner_id, + started_at: start_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + finished_at: 0, + }, + )?; + + // Add start metric for first start + if old_start_ts.is_none() { let dt = (start_ts - create_ts) as f64 / 1000.0; metrics::ACTOR_START_DURATION .with_label_values(&[]) diff --git a/packages/edge/services/pegboard/src/workflows/actor2/setup.rs b/packages/edge/services/pegboard/src/workflows/actor2/setup.rs index 3b52269027..b915aade1a 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/setup.rs @@ -544,16 +544,16 @@ async fn allocate_ingress_ports( } #[derive(Debug, Clone, Serialize, Deserialize, Hash)] -struct InsertIngressPortsInput { +struct InsertPortsInput { actor_id: util::Id, network_ports: util::serde::HashableMap, ingress_ports: Vec<(GameGuardProtocol, Vec)>, } -#[activity(InsertIngressPorts)] -async fn insert_ingress_ports( +#[activity(InsertPorts)] +async fn insert_ports( ctx: &ActivityCtx, - input: &InsertIngressPortsInput, + input: &InsertPortsInput, ) -> GlobalResult<()> { let pool = ctx.sqlite().await?; let mut conn = pool.conn().await?; @@ -820,7 +820,7 @@ pub async fn setup( }) .await?; - ctx.activity(InsertIngressPortsInput { + ctx.activity(InsertPortsInput { actor_id: input.actor_id, network_ports, ingress_ports: ingress_ports_res.ports, diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index c17c61d343..980274ce64 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -528,7 +528,7 @@ async fn publish_registered(ctx: &ActivityCtx, input: &PublishRegisteredInput) - let config = Configuration { client: rivet_pools::reqwest::client().await?, - base_path: util::url::to_string_without_slash(&edge.intercom_endpoint), + base_path: util::url::to_string_without_slash(&edge.intercom_address), bearer_access_token: Some(token), ..Default::default() }; diff --git a/site/src/content/docs/cloud/self-hosting/server-spec.json b/site/src/content/docs/cloud/self-hosting/server-spec.json index d3dc074c24..514d122369 100644 --- a/site/src/content/docs/cloud/self-hosting/server-spec.json +++ b/site/src/content/docs/cloud/self-hosting/server-spec.json @@ -1045,7 +1045,7 @@ "required": [ "cluster_id", "datacenter_id", - "intercom_endpoint", + "intercom_address", "server_id" ], "properties": { @@ -1074,7 +1074,7 @@ "type": "string", "format": "uuid" }, - "intercom_endpoint": { + "intercom_address": { "type": "string", "format": "uri" }, diff --git a/yarn.lock b/yarn.lock index 13ef711a8f..931bbd374d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7491,9 +7491,9 @@ __metadata: languageName: node linkType: hard -"actor-core@portal:../../actor-core/packages/actor-core::locator=rivet%40workspace%3A.": - version: 0.0.0-use.local - resolution: "actor-core@portal:../../actor-core/packages/actor-core::locator=rivet%40workspace%3A." +"actor-core@npm:0.9.0-rc.1": + version: 0.9.0-rc.1 + resolution: "actor-core@npm:0.9.0-rc.1" dependencies: cbor-x: "npm:^1.6.0" hono: "npm:^4.7.0"