From f2383b563b7998de6fc1dc97243c448ee0b596eb Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 26 Sep 2025 12:32:32 -0700 Subject: [PATCH] fix(guard): proxy routing errors through ws --- Cargo.lock | 1 + out/errors/guard.target_changed.json | 5 + packages/core/guard/core/Cargo.toml | 1 + packages/core/guard/core/src/errors.rs | 12 +- packages/core/guard/core/src/proxy_service.rs | 256 ++++++++++++------ packages/core/guard/server/src/routing/mod.rs | 1 + 6 files changed, 184 insertions(+), 92 deletions(-) create mode 100644 out/errors/guard.target_changed.json diff --git a/Cargo.lock b/Cargo.lock index 5226e98cc7..5bb4f8e373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,6 +4410,7 @@ dependencies = [ "futures", "futures-util", "gasoline", + "http 1.3.1", "http-body 1.0.1", "http-body-util", "hyper 1.6.0", diff --git a/out/errors/guard.target_changed.json b/out/errors/guard.target_changed.json new file mode 100644 index 0000000000..e3751cbd53 --- /dev/null +++ b/out/errors/guard.target_changed.json @@ -0,0 +1,5 @@ +{ + "code": "target_changed", + "group": "guard", + "message": "WebSocket target changed, retry not possible." +} \ No newline at end of file diff --git a/packages/core/guard/core/Cargo.toml b/packages/core/guard/core/Cargo.toml index 900e7057f5..e71adf9af1 100644 --- a/packages/core/guard/core/Cargo.toml +++ b/packages/core/guard/core/Cargo.toml @@ -17,6 +17,7 @@ futures-util.workspace = true futures.workspace = true http-body-util.workspace = true http-body.workspace = true +http.workspace = true # TODO: Make this use workspace version hyper = { version = "1.6.0", features = ["full", "http1", "http2"] } hyper-tungstenite.workspace = true diff --git a/packages/core/guard/core/src/errors.rs b/packages/core/guard/core/src/errors.rs index 90efaba6b1..d3e905b222 100644 --- a/packages/core/guard/core/src/errors.rs +++ b/packages/core/guard/core/src/errors.rs @@ -56,11 +56,10 @@ pub struct NoRouteTargets; "guard", "retry_attempts_exceeded", "Retry attempts exceeded.", - "All {attempts} retry attempts failed (max: {max_attempts})." + "All {attempts} retry attempts failed." )] pub struct RetryAttemptsExceeded { pub attempts: u32, - pub max_attempts: u32, } #[derive(RivetError, Serialize, Deserialize)] @@ -74,7 +73,14 @@ pub struct ConnectionError { #[error( "guard", "websocket_service_unavailable", - "WebSocket service unavailable.", "WebSocket service unavailable." )] pub struct WebSocketServiceUnavailable; + +#[derive(RivetError, Serialize, Deserialize)] +#[error( + "guard", + "target_changed", + "WebSocket target changed, retry not possible." +)] +pub struct WebSocketTargetChanged; diff --git a/packages/core/guard/core/src/proxy_service.rs b/packages/core/guard/core/src/proxy_service.rs index 58a4d52925..86287f24c5 100644 --- a/packages/core/guard/core/src/proxy_service.rs +++ b/packages/core/guard/core/src/proxy_service.rs @@ -982,7 +982,6 @@ impl ProxyService { // If we get here, all attempts failed return Err(errors::RetryAttemptsExceeded { attempts: max_attempts, - max_attempts, } .build()); } @@ -1047,7 +1046,6 @@ impl ProxyService { // If we get here, all attempts failed return Err(errors::RetryAttemptsExceeded { attempts: max_attempts, - max_attempts, } .build()); } @@ -1170,7 +1168,7 @@ impl ProxyService { // Handle WebSocket upgrade properly with hyper_tungstenite tracing::debug!("Upgrading client connection to WebSocket"); - let (client_response, client_websocket) = match hyper_tungstenite::upgrade(req, None) { + let (client_response, client_ws) = match hyper_tungstenite::upgrade(req, None) { Result::Ok(x) => { tracing::debug!("Client WebSocket upgrade successful"); x @@ -1220,7 +1218,7 @@ impl ProxyService { // First, wait for the client WebSocket to be ready (do this first to avoid race conditions) tracing::debug!("Waiting for client WebSocket to be ready..."); let mut client_ws = - match tokio::time::timeout(timeout_duration, client_websocket).await { + match tokio::time::timeout(timeout_duration, client_ws).await { Result::Ok(Result::Ok(ws)) => { tracing::debug!("Client WebSocket is ready"); ws @@ -1368,21 +1366,21 @@ impl ProxyService { ); let (mut client_sink, _) = client_ws.split(); match client_sink - .send(hyper_tungstenite::tungstenite::Message::Close(Some( - hyper_tungstenite::tungstenite::protocol::CloseFrame { - code: hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Error, - reason: "Failed to connect to upstream server".into(), - }, - ))) - .await - { - Result::Ok(_) => { - tracing::trace!("Successfully sent close message to client") - } - Err(err) => { - tracing::error!(?err, "Failed to send close message to client") - } - }; + .send(to_hyper_close(Some(err_to_close_frame( + errors::RetryAttemptsExceeded { attempts }.build(), + )))) + .await + { + Result::Ok(_) => { + tracing::trace!("Successfully sent close message to client") + } + Err(err) => { + tracing::error!( + ?err, + "Failed to send close message to client" + ) + } + }; match client_sink.flush().await { Result::Ok(_) => { @@ -1435,17 +1433,17 @@ impl ProxyService { ); // Close the WebSocket connection with the response message - let _ = client_ws.close(Some(tokio_tungstenite::tungstenite::protocol::CloseFrame { - code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Error, - reason: response.message.as_ref().into(), - })).await; + let _ = client_ws + .close(Some(str_to_close_frame(response.message.as_ref()))) + .await; return; } Result::Ok(ResolveRouteOutput::CustomServe(_)) => { - let _ = client_ws.close(Some(tokio_tungstenite::tungstenite::protocol::CloseFrame { - code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Error, - reason: "Cannot retry WebSocket with custom serve handler".into(), - })).await; + let _ = client_ws + .close(Some(err_to_close_frame( + errors::WebSocketTargetChanged.build(), + ))) + .await; return; } Err(err) => { @@ -1526,20 +1524,7 @@ impl ProxyService { // Signal shutdown to other direction let _ = shutdown_tx.send(true); - if let Some(frame) = frame { - // Manual conversion to handle different tungstenite versions - let code_num: u16 = frame.code.into(); - let reason = frame.reason.clone(); - - tokio_tungstenite::tungstenite::Message::Close(Some( - tokio_tungstenite::tungstenite::protocol::CloseFrame { - code: code_num.into(), - reason, - }, - )) - } else { - tokio_tungstenite::tungstenite::Message::Close(None) - } + to_hyper_close(frame) }, hyper_tungstenite::tungstenite::Message::Frame(_) => { // Skip frames - they're an implementation detail @@ -1690,20 +1675,7 @@ impl ProxyService { // Signal shutdown to other direction let _ = shutdown_tx.send(true); - if let Some(frame) = frame { - // Manual conversion to handle different tungstenite versions - let code_num: u16 = frame.code.into(); - let reason = frame.reason.clone(); - - hyper_tungstenite::tungstenite::Message::Close(Some( - hyper_tungstenite::tungstenite::protocol::CloseFrame { - code: code_num.into(), - reason, - }, - )) - } else { - hyper_tungstenite::tungstenite::Message::Close(None) - } + to_hyper_close(frame) }, tokio_tungstenite::tungstenite::Message::Frame(_) => { // Skip frames - they're an implementation detail @@ -1773,10 +1745,7 @@ impl ProxyService { // Try to send a close frame - ignore errors as the connection might already be closed tracing::trace!("Attempting to send close message to client"); - match sink - .send(hyper_tungstenite::tungstenite::Message::Close(None)) - .await - { + match sink.send(to_hyper_close(None)).await { Result::Ok(_) => { tracing::trace!("Close message sent to client successfully") } @@ -1815,7 +1784,6 @@ impl ProxyService { tokio::spawn( async move { let mut attempts = 0u32; - let mut client_ws = client_websocket; let ws_handle = WebSocketHandle::new(client_ws); @@ -1833,12 +1801,12 @@ impl ProxyService { tracing::debug!("websocket closed"); // Send graceful close - ws_handle.send(hyper_tungstenite::tungstenite::Message::Close(Some( - hyper_tungstenite::tungstenite::protocol::CloseFrame { - code: hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Normal, - reason: format!("Closed").into(), - }, - ))).await?; + ws_handle + .send(to_hyper_close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "".into(), + }))) + .await?; break; } @@ -1847,11 +1815,9 @@ impl ProxyService { if attempts > max_attempts || !is_retryable_ws_error(&err) { // Close WebSocket with error ws_handle - .accept_and_send( - hyper_tungstenite::tungstenite::Message::Close( - Some(err_to_close_frame(err)), - ), - ) + .accept_and_send(to_hyper_close(Some( + err_to_close_frame(err), + ))) .await?; break; @@ -1880,32 +1846,27 @@ impl ProxyService { } Result::Ok(ResolveRouteOutput::Response(response)) => { ws_handle - .accept_and_send(hyper_tungstenite::tungstenite::Message::Close(Some( - hyper_tungstenite::tungstenite::protocol::CloseFrame { - code: hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Error, - reason: response.message.as_ref().into(), - }, + .accept_and_send(to_hyper_close(Some( + str_to_close_frame( + response.message.as_ref(), + ), ))) .await; } Result::Ok(ResolveRouteOutput::Target(_)) => { ws_handle - .accept_and_send(hyper_tungstenite::tungstenite::Message::Close(Some( - hyper_tungstenite::tungstenite::protocol::CloseFrame { - code: hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Error, - reason: "Cannot retry WebSocket with non-custom serve route".into(), - }, + .accept_and_send(to_hyper_close(Some( + err_to_close_frame( + errors::WebSocketTargetChanged.build(), + ), ))) .await; break; } - Err(res_err) => { + Err(err) => { ws_handle - .accept_and_send(hyper_tungstenite::tungstenite::Message::Close(Some( - hyper_tungstenite::tungstenite::protocol::CloseFrame { - code: hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Error, - reason: format!("Routing error: {}", res_err).into(), - }, + .accept_and_send(to_hyper_close(Some( + err_to_close_frame(err), ))) .await; break; @@ -2021,6 +1982,21 @@ impl ProxyService { "Request received" ); + let is_websocket = hyper_tungstenite::is_upgrade_request(&req); + + // Used for ws error proxying later + let mut mock_req_builder = Request::builder() + .method(req.method().clone()) + .uri(req.uri().clone()) + .version(req.version().clone()); + if let Some(headers) = mock_req_builder.headers_mut() { + *headers = req.headers().clone(); + } + if let Some(extensions) = mock_req_builder.extensions_mut() { + *extensions = req.extensions().clone(); + } + let mock_req = mock_req_builder.body(())?; + // Process the request let res = match self .handle_request(req, start_time, &mut request_context) @@ -2034,10 +2010,85 @@ impl ProxyService { metrics::PROXY_REQUEST_ERROR .add(1, &[KeyValue::new("error_type", err.to_string())]); - err_into_response(err)? + // If we receive an error during a websocket request, we attempt to open the websocket anyway + // so we can send the error via websocket instead of http. Most websocket clients don't handle + // HTTP errors in a meaningful way for the user resulting in unhelpful errors + if is_websocket { + tracing::debug!("Upgrading client connection to WebSocket for error proxy"); + match hyper_tungstenite::upgrade(mock_req, None) { + Result::Ok((client_response, client_ws)) => { + tracing::debug!("Client WebSocket upgrade for error proxy successful"); + + tokio::spawn(async move { + let ws_handle = WebSocketHandle::new(client_ws); + let frame = err_to_close_frame(err); + + // Manual conversion to handle different tungstenite versions + let code_num: u16 = frame.code.into(); + let reason = frame.reason.clone(); + + ws_handle + .accept_and_send( + tokio_tungstenite::tungstenite::Message::Close(Some( + tokio_tungstenite::tungstenite::protocol::CloseFrame { + code: code_num.into(), + reason, + }, + )), + ) + .await; + }); + + // Return the response that will upgrade the client connection + // For proper WebSocket handshaking, we need to preserve the original response + // structure but convert it to our expected return type without modifying its content + tracing::debug!( + "Returning WebSocket upgrade response for error proxy to client" + ); + // Extract the parts from the response but preserve all headers and status + let (mut parts, _) = client_response.into_parts(); + + // Add Sec-WebSocket-Protocol header to the response + // Many WebSocket clients (e.g. node-ws & Cloudflare) require a protocol in the response + parts.headers.insert( + "sec-websocket-protocol", + hyper::header::HeaderValue::from_static("rivet"), + ); + + // Create a new response with an empty body - WebSocket upgrades don't need a body + Response::from_parts( + parts, + ResponseBody::Full(Full::::new(Bytes::new())), + ) + } + Err(err) => { + tracing::error!( + ?err, + "Failed to upgrade client WebSocket for error proxy" + ); + + err_into_response( + errors::ConnectionError { + error_message: format!( + "Failed to upgrade client WebSocket for error proxy: {}", + err + ), + remote_addr: self.remote_addr.to_string(), + } + .build(), + )? + } + } + } else { + err_into_response(err)? + } } }; + if is_websocket && res.status() != StatusCode::SWITCHING_PROTOCOLS { + tracing::debug!("returned non-101 response to websocket"); + } + let status = res.status().as_u16(); // Update request context with response details @@ -2255,7 +2306,17 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool { } } -pub fn err_to_close_frame(err: anyhow::Error) -> CloseFrame { +fn str_to_close_frame(err: &str) -> CloseFrame { + // NOTE: reason cannot be more than 123 bytes as per the WS protocol spec + let reason = rivet_util::safe_slice(err, 0, 123).into(); + + CloseFrame { + code: CloseCode::Error, + reason, + } +} + +fn err_to_close_frame(err: anyhow::Error) -> CloseFrame { let rivet_err = err .chain() .find_map(|x| x.downcast_ref::()) @@ -2277,3 +2338,20 @@ pub fn err_to_close_frame(err: anyhow::Error) -> CloseFrame { CloseFrame { code, reason } } + +fn to_hyper_close(frame: Option) -> hyper_tungstenite::tungstenite::Message { + if let Some(frame) = frame { + // Manual conversion to handle different tungstenite versions + let code_num: u16 = frame.code.into(); + let reason = frame.reason.clone(); + + tokio_tungstenite::tungstenite::Message::Close(Some( + tokio_tungstenite::tungstenite::protocol::CloseFrame { + code: code_num.into(), + reason, + }, + )) + } else { + tokio_tungstenite::tungstenite::Message::Close(None) + } +} diff --git a/packages/core/guard/server/src/routing/mod.rs b/packages/core/guard/server/src/routing/mod.rs index ac61338bd1..057a54e54f 100644 --- a/packages/core/guard/server/src/routing/mod.rs +++ b/packages/core/guard/server/src/routing/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::*; use gas::prelude::*; use hyper::header::HeaderName; +use hyper::{Request, body::Incoming as BodyIncoming}; use rivet_guard_core::RoutingFn; use crate::{errors, shared_state::SharedState};