Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions frontend/src/app/data-providers/engine-data-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
type DefaultDataProvider,
RECORDS_PER_PAGE,
} from "./default-data-provider";
import { getConfig } from "@/components/lib/config";

export type CreateNamespace = {
displayName: string;
Expand All @@ -29,7 +30,7 @@ export type Namespace = {
};

export function createClient(
baseUrl = engineEnv().VITE_APP_API_URL,
baseUrl = getConfig().apiUrl,
opts: { token: (() => string) | string },
) {
return new RivetClient({
Expand All @@ -42,7 +43,7 @@ export function createClient(
export const createGlobalContext = (opts: {
engineToken: (() => string) | string;
}) => {
const client = createClient(engineEnv().VITE_APP_API_URL, {
const client = createClient(getConfig().apiUrl, {
token: opts.engineToken,
});
return {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/guard/core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ impl ProxyService {
.await
{
Result::Ok(()) => {
tracing::debug!("websocket closed");
tracing::debug!("websocket handler complete, closing");

// Send graceful close
ws_handle
Expand Down
1 change: 1 addition & 0 deletions packages/core/pegboard-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ rivet-util.workspace = true
thiserror.workspace = true
tokio-tungstenite.workspace = true
tokio.workspace = true
tracing.workspace = true
universalpubsub.workspace = true
vbare.workspace = true
46 changes: 33 additions & 13 deletions packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::shared_state::{SharedState, TunnelMessageData};

pub mod shared_state;

const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2);
const TUNNEL_ACK_TIMEOUT: Duration = Duration::from_secs(2);
const SEC_WEBSOCKET_PROTOCOL: HeaderName = HeaderName::from_static("sec-websocket-protocol");
const WS_PROTOCOL_ACTOR: &str = "rivet_actor.";

Expand All @@ -33,6 +33,7 @@ pub struct PegboardGateway {
}

impl PegboardGateway {
#[tracing::instrument(skip_all, fields(?actor_id, ?runner_id))]
pub fn new(ctx: StandaloneCtx, shared_state: SharedState, runner_id: Id, actor_id: Id) -> Self {
Self {
ctx,
Expand All @@ -45,6 +46,7 @@ impl PegboardGateway {

#[async_trait]
impl CustomServeTrait for PegboardGateway {
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))]
async fn handle_request(
&self,
req: Request<Full<Bytes>>,
Expand All @@ -67,6 +69,7 @@ impl CustomServeTrait for PegboardGateway {
}
}

#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))]
async fn handle_websocket(
&self,
client_ws: WebSocketHandle,
Expand All @@ -91,6 +94,7 @@ impl CustomServeTrait for PegboardGateway {
}

impl PegboardGateway {
#[tracing::instrument(skip_all)]
async fn handle_request_inner(
&self,
req: Request<Full<Bytes>>,
Expand Down Expand Up @@ -155,9 +159,16 @@ impl PegboardGateway {
self.shared_state.send_message(request_id, message).await?;

// Wait for response
tracing::debug!("starting response handler task");
tracing::debug!("gateway waiting for response from tunnel");
let response_start = loop {
let Some(msg) = msg_rx.recv().await else {
let Some(msg) = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, msg_rx.recv())
.await
.map_err(|_| {
tracing::warn!("timed out waiting for tunnel ack");

RequestError::ServiceUnavailable
})?
else {
tracing::warn!("received no message response");
return Err(RequestError::ServiceUnavailable.into());
};
Expand Down Expand Up @@ -195,6 +206,7 @@ impl PegboardGateway {
Ok(response)
}

#[tracing::instrument(skip_all)]
async fn handle_websocket_inner(
&self,
client_ws: WebSocketHandle,
Expand Down Expand Up @@ -247,34 +259,42 @@ impl PegboardGateway {
.send_message(request_id, open_message)
.await?;

tracing::debug!("gateway waiting for websocket open from tunnel");

// Wait for WebSocket open acknowledgment
let open_ack_received = loop {
let Some(msg) = msg_rx.recv().await else {
bail!("received no websocket open response");
loop {
let Some(msg) = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, msg_rx.recv())
.await
.map_err(|_| {
tracing::warn!("timed out waiting for tunnel ack");

RequestError::ServiceUnavailable
})?
else {
tracing::warn!("received no message response");
return Err(RequestError::ServiceUnavailable.into());
};

match msg {
TunnelMessageData::Message(
protocol::ToServerTunnelMessageKind::ToServerWebSocketOpen,
) => {
break true;
break;
}
TunnelMessageData::Message(
protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close),
) => {
bail!("websocket closed before opening: {close:?}");
tracing::warn!(?close, "websocket closed before opening");
return Err(RequestError::ServiceUnavailable.into());
}
TunnelMessageData::Timeout => {
bail!("websocket open timeout");
tracing::warn!("websocket open timeout");
return Err(RequestError::ServiceUnavailable.into());
}
_ => {
tracing::warn!("received unexpected message while waiting for websocket open");
}
}
};

if !open_ack_received {
bail!("failed to open websocket");
}

// Accept the WebSocket
Expand Down
4 changes: 4 additions & 0 deletions packages/core/pegboard-runner/src/client_to_pubsub_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
utils::{self},
};

#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))]
pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, ws_rx: WebSocketReceiver) {
match task_inner(ctx, conn, ws_rx).await {
Ok(_) => {}
Expand All @@ -25,6 +26,7 @@ pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, ws_rx: WebSocketReceiver)
}
}

#[tracing::instrument(skip_all)]
async fn task_inner(
ctx: StandaloneCtx,
conn: Arc<Conn>,
Expand Down Expand Up @@ -77,6 +79,7 @@ async fn task_inner(
Ok(())
}

#[tracing::instrument(skip_all)]
async fn handle_message(
ctx: &StandaloneCtx,
conn: &Arc<Conn>,
Expand Down Expand Up @@ -354,6 +357,7 @@ async fn handle_message(
Ok(())
}

#[tracing::instrument(skip_all)]
async fn handle_tunnel_message(
ctx: &StandaloneCtx,
conn: &Arc<Conn>,
Expand Down
9 changes: 9 additions & 0 deletions packages/core/pegboard-runner/src/pubsub_to_client_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
utils,
};

#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))]
pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, sub: Subscriber) {
match task_inner(ctx, conn, sub).await {
Ok(_) => {}
Expand All @@ -21,6 +22,7 @@ pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, sub: Subscriber) {
}
}

#[tracing::instrument(skip_all)]
async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>, mut sub: Subscriber) -> Result<()> {
while let Result::Ok(NextOutput::Message(ups_msg)) = sub.next().await {
tracing::debug!(
Expand All @@ -37,6 +39,7 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>, mut sub: Subscriber) ->
continue;
}
};
let is_close = utils::is_to_client_close(&msg);

// Handle tunnel messages
if let protocol::ToClient::ToClientTunnelMessage(tunnel_msg) = &mut msg {
Expand All @@ -57,11 +60,17 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>, mut sub: Subscriber) ->
tracing::error!(?e, "failed to send message to WebSocket");
break;
}

if is_close {
tracing::debug!("manually closing websocket");
break;
}
}

Ok(())
}

#[tracing::instrument(skip_all)]
async fn handle_tunnel_message(conn: &Arc<Conn>, msg: &mut protocol::ToClientTunnelMessage) {
// Save active request
//
Expand Down
11 changes: 7 additions & 4 deletions packages/core/pegboard-runner/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ use hyper_tungstenite::tungstenite::Message as WsMessage;
use hyper_util::rt::TokioIo;
use rivet_error::*;
use rivet_runner_protocol as protocol;
use tokio_tungstenite::{
WebSocketStream,
tungstenite::protocol::frame::{CloseFrame, coding::CloseCode},
};

#[derive(Clone)]
pub struct UrlData {
Expand Down Expand Up @@ -73,3 +69,10 @@ pub fn is_to_client_tunnel_message_kind_request_close(
_ => false,
}
}

pub fn is_to_client_close(kind: &protocol::ToClient) -> bool {
match kind {
protocol::ToClient::ToClientClose => true,
_ => false,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ pub async fn namespace_resolve_for_name_global(
let client = client.clone();
async move {
let url = leader_dc.api_peer_url.join(&format!("/namespaces"))?;
let res = client.get(url).query(&[("name", &input.name)]).send().await?;
let res = client
.get(url)
.query(&[("name", &input.name)])
.send()
.await?;

let res = rivet_api_util::parse_response::<
rivet_api_types::namespaces::list::ListResponse,
Expand Down
3 changes: 2 additions & 1 deletion sdks/typescript/runner/src/mod.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion site/public/llms-full.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions site/public/llms.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading