diff --git a/Cargo.lock b/Cargo.lock index c710bd0f1a..d36b635755 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3307,6 +3307,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-tungstenite", + "tracing", "universalpubsub", "vbare", ] diff --git a/frontend/src/app/data-providers/engine-data-provider.tsx b/frontend/src/app/data-providers/engine-data-provider.tsx index 2f4642fc0a..d737caf0bf 100644 --- a/frontend/src/app/data-providers/engine-data-provider.tsx +++ b/frontend/src/app/data-providers/engine-data-provider.tsx @@ -15,6 +15,7 @@ import { type DefaultDataProvider, RECORDS_PER_PAGE, } from "./default-data-provider"; +import { getConfig } from "@/components/lib/config"; export type CreateNamespace = { displayName: string; @@ -29,7 +30,7 @@ export type Namespace = { }; export function createClient( - baseUrl = engineEnv().VITE_APP_API_URL, + baseUrl = getConfig().apiUrl, opts: { token: (() => string) | string }, ) { return new RivetClient({ @@ -42,7 +43,7 @@ export function createClient( export const createGlobalContext = (opts: { engineToken: (() => string) | string; }) => { - const client = createClient(engineEnv().VITE_APP_API_URL, { + const client = createClient(getConfig().apiUrl, { token: opts.engineToken, }); return { diff --git a/packages/core/guard/core/src/proxy_service.rs b/packages/core/guard/core/src/proxy_service.rs index 86287f24c5..92f32a731f 100644 --- a/packages/core/guard/core/src/proxy_service.rs +++ b/packages/core/guard/core/src/proxy_service.rs @@ -1798,7 +1798,7 @@ impl ProxyService { .await { Result::Ok(()) => { - tracing::debug!("websocket closed"); + tracing::debug!("websocket handler complete, closing"); // Send graceful close ws_handle diff --git a/packages/core/pegboard-gateway/Cargo.toml b/packages/core/pegboard-gateway/Cargo.toml index 4447b19c0f..1f54b69134 100644 --- a/packages/core/pegboard-gateway/Cargo.toml +++ b/packages/core/pegboard-gateway/Cargo.toml @@ -23,5 +23,6 @@ rivet-util.workspace = true thiserror.workspace = true tokio-tungstenite.workspace = true tokio.workspace = true +tracing.workspace = true universalpubsub.workspace = true vbare.workspace = true diff --git a/packages/core/pegboard-gateway/src/lib.rs b/packages/core/pegboard-gateway/src/lib.rs index f8482008a0..84ce838920 100644 --- a/packages/core/pegboard-gateway/src/lib.rs +++ b/packages/core/pegboard-gateway/src/lib.rs @@ -21,7 +21,7 @@ use crate::shared_state::{SharedState, TunnelMessageData}; pub mod shared_state; -const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2); +const TUNNEL_ACK_TIMEOUT: Duration = Duration::from_secs(2); const SEC_WEBSOCKET_PROTOCOL: HeaderName = HeaderName::from_static("sec-websocket-protocol"); const WS_PROTOCOL_ACTOR: &str = "rivet_actor."; @@ -33,6 +33,7 @@ pub struct PegboardGateway { } impl PegboardGateway { + #[tracing::instrument(skip_all, fields(?actor_id, ?runner_id))] pub fn new(ctx: StandaloneCtx, shared_state: SharedState, runner_id: Id, actor_id: Id) -> Self { Self { ctx, @@ -45,6 +46,7 @@ impl PegboardGateway { #[async_trait] impl CustomServeTrait for PegboardGateway { + #[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))] async fn handle_request( &self, req: Request>, @@ -67,6 +69,7 @@ impl CustomServeTrait for PegboardGateway { } } + #[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))] async fn handle_websocket( &self, client_ws: WebSocketHandle, @@ -91,6 +94,7 @@ impl CustomServeTrait for PegboardGateway { } impl PegboardGateway { + #[tracing::instrument(skip_all)] async fn handle_request_inner( &self, req: Request>, @@ -155,9 +159,16 @@ impl PegboardGateway { self.shared_state.send_message(request_id, message).await?; // Wait for response - tracing::debug!("starting response handler task"); + tracing::debug!("gateway waiting for response from tunnel"); let response_start = loop { - let Some(msg) = msg_rx.recv().await else { + let Some(msg) = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, msg_rx.recv()) + .await + .map_err(|_| { + tracing::warn!("timed out waiting for tunnel ack"); + + RequestError::ServiceUnavailable + })? + else { tracing::warn!("received no message response"); return Err(RequestError::ServiceUnavailable.into()); }; @@ -195,6 +206,7 @@ impl PegboardGateway { Ok(response) } + #[tracing::instrument(skip_all)] async fn handle_websocket_inner( &self, client_ws: WebSocketHandle, @@ -247,34 +259,42 @@ impl PegboardGateway { .send_message(request_id, open_message) .await?; + tracing::debug!("gateway waiting for websocket open from tunnel"); + // Wait for WebSocket open acknowledgment - let open_ack_received = loop { - let Some(msg) = msg_rx.recv().await else { - bail!("received no websocket open response"); + loop { + let Some(msg) = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, msg_rx.recv()) + .await + .map_err(|_| { + tracing::warn!("timed out waiting for tunnel ack"); + + RequestError::ServiceUnavailable + })? + else { + tracing::warn!("received no message response"); + return Err(RequestError::ServiceUnavailable.into()); }; match msg { TunnelMessageData::Message( protocol::ToServerTunnelMessageKind::ToServerWebSocketOpen, ) => { - break true; + break; } TunnelMessageData::Message( protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close), ) => { - bail!("websocket closed before opening: {close:?}"); + tracing::warn!(?close, "websocket closed before opening"); + return Err(RequestError::ServiceUnavailable.into()); } TunnelMessageData::Timeout => { - bail!("websocket open timeout"); + tracing::warn!("websocket open timeout"); + return Err(RequestError::ServiceUnavailable.into()); } _ => { tracing::warn!("received unexpected message while waiting for websocket open"); } } - }; - - if !open_ack_received { - bail!("failed to open websocket"); } // Accept the WebSocket diff --git a/packages/core/pegboard-runner/src/client_to_pubsub_task.rs b/packages/core/pegboard-runner/src/client_to_pubsub_task.rs index 6f1b4efd3e..90004ab6c8 100644 --- a/packages/core/pegboard-runner/src/client_to_pubsub_task.rs +++ b/packages/core/pegboard-runner/src/client_to_pubsub_task.rs @@ -16,6 +16,7 @@ use crate::{ utils::{self}, }; +#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))] pub async fn task(ctx: StandaloneCtx, conn: Arc, ws_rx: WebSocketReceiver) { match task_inner(ctx, conn, ws_rx).await { Ok(_) => {} @@ -25,6 +26,7 @@ pub async fn task(ctx: StandaloneCtx, conn: Arc, ws_rx: WebSocketReceiver) } } +#[tracing::instrument(skip_all)] async fn task_inner( ctx: StandaloneCtx, conn: Arc, @@ -77,6 +79,7 @@ async fn task_inner( Ok(()) } +#[tracing::instrument(skip_all)] async fn handle_message( ctx: &StandaloneCtx, conn: &Arc, @@ -354,6 +357,7 @@ async fn handle_message( Ok(()) } +#[tracing::instrument(skip_all)] async fn handle_tunnel_message( ctx: &StandaloneCtx, conn: &Arc, diff --git a/packages/core/pegboard-runner/src/pubsub_to_client_task.rs b/packages/core/pegboard-runner/src/pubsub_to_client_task.rs index 640cb9e5cf..3e7bd4c256 100644 --- a/packages/core/pegboard-runner/src/pubsub_to_client_task.rs +++ b/packages/core/pegboard-runner/src/pubsub_to_client_task.rs @@ -12,6 +12,7 @@ use crate::{ utils, }; +#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))] pub async fn task(ctx: StandaloneCtx, conn: Arc, sub: Subscriber) { match task_inner(ctx, conn, sub).await { Ok(_) => {} @@ -21,6 +22,7 @@ pub async fn task(ctx: StandaloneCtx, conn: Arc, sub: Subscriber) { } } +#[tracing::instrument(skip_all)] async fn task_inner(ctx: StandaloneCtx, conn: Arc, mut sub: Subscriber) -> Result<()> { while let Result::Ok(NextOutput::Message(ups_msg)) = sub.next().await { tracing::debug!( @@ -37,6 +39,7 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc, mut sub: Subscriber) -> continue; } }; + let is_close = utils::is_to_client_close(&msg); // Handle tunnel messages if let protocol::ToClient::ToClientTunnelMessage(tunnel_msg) = &mut msg { @@ -57,11 +60,17 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc, mut sub: Subscriber) -> tracing::error!(?e, "failed to send message to WebSocket"); break; } + + if is_close { + tracing::debug!("manually closing websocket"); + break; + } } Ok(()) } +#[tracing::instrument(skip_all)] async fn handle_tunnel_message(conn: &Arc, msg: &mut protocol::ToClientTunnelMessage) { // Save active request // diff --git a/packages/core/pegboard-runner/src/utils.rs b/packages/core/pegboard-runner/src/utils.rs index aab6f7623c..a1c731163f 100644 --- a/packages/core/pegboard-runner/src/utils.rs +++ b/packages/core/pegboard-runner/src/utils.rs @@ -4,10 +4,6 @@ use hyper_tungstenite::tungstenite::Message as WsMessage; use hyper_util::rt::TokioIo; use rivet_error::*; use rivet_runner_protocol as protocol; -use tokio_tungstenite::{ - WebSocketStream, - tungstenite::protocol::frame::{CloseFrame, coding::CloseCode}, -}; #[derive(Clone)] pub struct UrlData { @@ -73,3 +69,10 @@ pub fn is_to_client_tunnel_message_kind_request_close( _ => false, } } + +pub fn is_to_client_close(kind: &protocol::ToClient) -> bool { + match kind { + protocol::ToClient::ToClientClose => true, + _ => false, + } +} diff --git a/packages/services/namespace/src/ops/resolve_for_name_global.rs b/packages/services/namespace/src/ops/resolve_for_name_global.rs index c7d85d5fd1..51d451f563 100644 --- a/packages/services/namespace/src/ops/resolve_for_name_global.rs +++ b/packages/services/namespace/src/ops/resolve_for_name_global.rs @@ -31,7 +31,11 @@ pub async fn namespace_resolve_for_name_global( let client = client.clone(); async move { let url = leader_dc.api_peer_url.join(&format!("/namespaces"))?; - let res = client.get(url).query(&[("name", &input.name)]).send().await?; + let res = client + .get(url) + .query(&[("name", &input.name)]) + .send() + .await?; let res = rivet_api_util::parse_response::< rivet_api_types::namespaces::list::ListResponse, diff --git a/sdks/typescript/runner/src/mod.ts b/sdks/typescript/runner/src/mod.ts index a6499c09c4..dd3444b3d3 100644 --- a/sdks/typescript/runner/src/mod.ts +++ b/sdks/typescript/runner/src/mod.ts @@ -538,7 +538,8 @@ export class Runner { } else if (message.tag === "ToClientTunnelMessage") { this.#tunnel?.handleTunnelMessage(message.val); } else if (message.tag === "ToClientClose") { - // TODO: Close ws + this.#tunnel.shutdown(); + ws.close(1000, "manual closure"); } else { unreachable(message); } diff --git a/site/public/llms-full.txt b/site/public/llms-full.txt index d61f289948..317ea63fe0 100644 --- a/site/public/llms-full.txt +++ b/site/public/llms-full.txt @@ -5838,12 +5838,26 @@ Rivet Engine supports scaling transparently across multiple regions. Railway provides a simple platform for deploying Rivet Engine with automatic scaling and managed infrastructure. +## Video Tutorial + ## Quick Deploy -[![Deploy on Railway](https://railway.com/button.svg)](https://railway.com/deploy/rivet?referralCode=RC7bza&utm_medium=integration&utm_source=template&utm_campaign=generic) +Choose the template that best fits your needs: + +| **Rivet Template** | **Rivet Starter** | +|-------------------|-------------------| +| [![Deploy on Railway](https://railway.com/button.svg)](https://railway.com/deploy/rivet?referralCode=RC7bza&utm_medium=integration&utm_source=template&utm_campaign=generic) | [![Deploy Rivet Starter](https://railway.com/button.svg)](https://railway.com/deploy/rivet-starter) | +| **Blank template** to start fresh | **Complete example** with chat app | +| - Rivet Engine | - Pre-configured Rivet Engine | +| - PostgreSQL database | - Example chat application with Actors | +| - Basic configuration | - PostgreSQL database | +| - Manual setup required | - Rivet Inspector for debugging | +| | - Ready to run immediately | You can also use the [Rivet Railway template](https://github.com/rivet-gg/template-railway) as a starting point for your application. +After deploying either template, you can find the `RIVET__AUTH__ADMIN_TOKEN` under the **Variables** tab in the Railway dashboard. This token is required to access the Rivet Inspector. + ## Manual Deployment ### Prerequisites diff --git a/site/public/llms.txt b/site/public/llms.txt index d3445f5383..41197f1b2a 100644 --- a/site/public/llms.txt +++ b/site/public/llms.txt @@ -20,6 +20,7 @@ https://rivet.gg/blog/2025-09-14-weekly-updates https://rivet.gg/blog/2025-09-21-weekly-updates https://rivet.gg/blog/2025-09-24-vbare-simple-schema-evolution-with-maximum-performance https://rivet.gg/blog/2025-1-12-rivet-inspector +https://rivet.gg/blog/2025-10-01-railway-selfhost https://rivet.gg/blog/godot-multiplayer-compared-to-unity https://rivet.gg/changelog https://rivet.gg/changelog.json @@ -40,6 +41,7 @@ https://rivet.gg/changelog/2025-09-14-weekly-updates https://rivet.gg/changelog/2025-09-21-weekly-updates https://rivet.gg/changelog/2025-09-24-vbare-simple-schema-evolution-with-maximum-performance https://rivet.gg/changelog/2025-1-12-rivet-inspector +https://rivet.gg/changelog/2025-10-01-railway-selfhost https://rivet.gg/changelog/godot-multiplayer-compared-to-unity https://rivet.gg/cloud https://rivet.gg/docs/actors