Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2537,7 +2537,7 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool {
}
}

fn is_ws_hibernate(err: &anyhow::Error) -> bool {
pub fn is_ws_hibernate(err: &anyhow::Error) -> bool {
if let Some(rivet_err) = err.chain().find_map(|x| x.downcast_ref::<RivetError>()) {
rivet_err.group() == "guard" && rivet_err.code() == "websocket_service_hibernate"
} else {
Expand Down
87 changes: 64 additions & 23 deletions engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use rivet_guard_core::{
ServiceUnavailable, WebSocketServiceHibernate, WebSocketServiceTimeout,
WebSocketServiceUnavailable,
},
proxy_service::ResponseBody,
proxy_service::{ResponseBody, is_ws_hibernate},
request_context::RequestContext,
websocket_handle::WebSocketReceiver,
};
Expand Down Expand Up @@ -152,6 +152,11 @@ impl CustomServeTrait for PegboardGateway {
.context("failed to read body")?
.to_bytes();

let mut stopped_sub = self
.ctx
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id))
.await?;

// Build subject to publish to
let tunnel_subject =
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
Expand Down Expand Up @@ -212,6 +217,10 @@ impl CustomServeTrait for PegboardGateway {
break;
}
}
_ = stopped_sub.next() => {
tracing::debug!("actor stopped while waiting for request response");
return Err(ServiceUnavailable.build());
}
_ = drop_rx.changed() => {
tracing::warn!("tunnel message timeout");
return Err(ServiceUnavailable.build());
Expand Down Expand Up @@ -278,6 +287,11 @@ impl CustomServeTrait for PegboardGateway {
}
}

let mut stopped_sub = self
.ctx
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id))
.await?;

// Build subject to publish to
let tunnel_subject =
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
Expand Down Expand Up @@ -339,6 +353,10 @@ impl CustomServeTrait for PegboardGateway {
break;
}
}
_ = stopped_sub.next() => {
tracing::debug!("actor stopped while waiting for websocket open");
return Err(WebSocketServiceUnavailable.build());
}
_ = drop_rx.changed() => {
tracing::warn!("websocket open timeout");
return Err(WebSocketServiceUnavailable.build());
Expand All @@ -364,7 +382,7 @@ impl CustomServeTrait for PegboardGateway {
open_msg.can_hibernate
};

// Send reclaimed messages
// Send pending messages
self.shared_state
.resend_pending_websocket_messages(request_id)
.await?;
Expand Down Expand Up @@ -415,6 +433,15 @@ impl CustomServeTrait for PegboardGateway {
return Err(WebSocketServiceHibernate.build());
}
}
_ = stopped_sub.next() => {
tracing::debug!("actor stopped during websocket handler loop");

if can_hibernate {
return Err(WebSocketServiceHibernate.build());
} else {
return Err(WebSocketServiceUnavailable.build());
}
}
_ = drop_rx.changed() => {
tracing::warn!("websocket message timeout");
return Err(WebSocketServiceTimeout.build());
Expand Down Expand Up @@ -532,28 +559,33 @@ impl CustomServeTrait for PegboardGateway {
(res, _) => res,
};

// Send WebSocket close message to runner
let (close_code, close_reason) = match &mut lifecycle_res {
// Taking here because it won't be used again
Ok(LifecycleResult::ClientClose(Some(close))) => {
(close.code, Some(std::mem::take(&mut close.reason)))
}
Ok(_) => (CloseCode::Normal.into(), None),
Err(_) => (CloseCode::Error.into(), Some("ws.downstream_closed".into())),
};
let close_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketClose(
protocol::ToClientWebSocketClose {
code: Some(close_code.into()),
reason: close_reason.map(|x| x.as_str().to_string()),
},
);

if let Err(err) = self
.shared_state
.send_message(request_id, close_message)
.await
// Send close frame to runner if not hibernating
if lifecycle_res
.as_ref()
.map_or_else(is_ws_hibernate, |_| false)
{
tracing::error!(?err, "error sending close message");
let (close_code, close_reason) = match &mut lifecycle_res {
// Taking here because it won't be used again
Ok(LifecycleResult::ClientClose(Some(close))) => {
(close.code, Some(std::mem::take(&mut close.reason)))
}
Ok(_) => (CloseCode::Normal.into(), None),
Err(_) => (CloseCode::Error.into(), Some("ws.downstream_closed".into())),
};
let close_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketClose(
protocol::ToClientWebSocketClose {
code: Some(close_code.into()),
reason: close_reason.map(|x| x.as_str().to_string()),
},
);

if let Err(err) = self
.shared_state
.send_message(request_id, close_message)
.await
{
tracing::error!(?err, "error sending close message");
}
}

// Send WebSocket close message to client
Expand All @@ -579,6 +611,15 @@ impl CustomServeTrait for PegboardGateway {
client_ws: WebSocketHandle,
unique_request_id: Uuid,
) -> Result<HibernationResult> {
// Immediately rewake if we have pending messages
if self
.shared_state
.has_pending_websocket_messages(unique_request_id.into_bytes())
.await?
{
return Ok(HibernationResult::Continue);
}

// Start keepalive task
let ctx = self.ctx.clone();
let actor_id = self.actor_id;
Expand Down
12 changes: 12 additions & 0 deletions engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,18 @@ impl SharedState {
Ok(())
}

pub async fn has_pending_websocket_messages(&self, request_id: RequestId) -> Result<bool> {
let Some(req) = self.in_flight_requests.get_async(&request_id).await else {
bail!("request not in flight");
};

if let Some(hs) = &req.hibernation_state {
Ok(!hs.pending_ws_msgs.is_empty())
} else {
Ok(false)
}
}

pub async fn ack_pending_websocket_messages(
&self,
request_id: RequestId,
Expand Down
29 changes: 9 additions & 20 deletions engine/packages/pegboard-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use rivet_guard_core::{
use rivet_runner_protocol as protocol;
use std::time::Duration;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite::protocol::frame::{CloseFrame, coding::CloseCode};
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
use universalpubsub::PublishOpts;
use vbare::OwnedVersionedData;

Expand Down Expand Up @@ -243,30 +243,19 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
);
}

// Send WebSocket close messages to all remaining active requests
// Send close messages to all remaining active requests
let active_requests = conn.tunnel_active_requests.lock().await;
for (request_id, req) in &*active_requests {
let close_msg_kind = if req.is_ws {
let (close_code, close_reason) = if lifecycle_res.is_ok() {
(CloseCode::Normal.into(), None)
} else {
(CloseCode::Error.into(), Some("ws.upstream_closed".into()))
};

protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(
protocol::ToServerWebSocketClose {
code: Some(close_code),
reason: close_reason,
hibernate: true,
},
)
} else {
protocol::ToServerTunnelMessageKind::ToServerResponseAbort
};
// Websockets are not ephemeral like requests. If the runner ws closes they are not informed;
// instead they wait for the actor itself to stop.
if req.is_ws {
continue;
}

let close_message = protocol::ToServerTunnelMessage {
request_id: request_id.clone(),
message_id: Uuid::new_v4().into_bytes(),
message_kind: close_msg_kind,
message_kind: protocol::ToServerTunnelMessageKind::ToServerResponseAbort,
};

let msg_serialized = protocol::versioned::ToGateway::wrap_latest(protocol::ToGateway {
Expand Down
27 changes: 1 addition & 26 deletions website/public/llms-full.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions website/public/llms.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading