Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions packages/edge/infra/guard/core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ impl ProxyService {
async fn handle_websocket_upgrade(
&self,
req: Request<BodyIncoming>,
target: RouteTarget,
mut target: RouteTarget,
) -> GlobalResult<Response<Full<Bytes>>> {
// Get actor and server IDs for metrics and middleware
let actor_id = target.actor_id;
Expand All @@ -978,6 +978,19 @@ impl ProxyService {
// Start timing the request (metrics already incremented in handle_request)
let start_time = Instant::now();

// Parsed for retries later
let req_host = req
.headers()
.get(hyper::header::HOST)
.and_then(|h| h.to_str().ok())
.unwrap_or("unknown")
.to_string();
let req_path = req
.uri()
.path_and_query()
.map(|x| x.to_string())
.unwrap_or_else(|| req.uri().path().to_string());

// Log request details
info!("WebSocket upgrade request for path: {}, target host: {}:{}, actor_id: {}, server_id: {}",
target.path, target.host, target.port, actor_id_str, server_id_str);
Expand Down Expand Up @@ -1044,11 +1057,8 @@ impl ProxyService {
}
}

// Now we need to connect to the upstream WebSocket server
let target_url = format!("ws://{}:{}{}", target.host, target.port, target.path);
info!("Target upstream WebSocket URL: {}", target_url);

// Clone needed values for the spawned task
let state = self.state.clone();
let actor_id_str_clone = actor_id_str.clone();
let server_id_str_clone = server_id_str.clone();
let path = target.path.clone();
Expand Down Expand Up @@ -1110,12 +1120,11 @@ impl ProxyService {
};

// Now attempt to connect to the upstream server
info!(
"Attempting to connect to upstream WebSocket at {}",
target_url
);
info!("Attempting connect to upstream WebSocket");
while attempts < max_attempts {
attempts += 1;

let target_url = format!("ws://{}:{}{}", target.host, target.port, target.path);
info!(
"WebSocket request attempt {}/{} to {}",
attempts, max_attempts, target_url
Expand Down Expand Up @@ -1196,7 +1205,23 @@ impl ProxyService {
// Use backoff for the next attempt
let backoff = Self::calculate_backoff(attempts, initial_interval);
info!("Waiting for {:?} before next connection attempt", backoff);
tokio::time::sleep(backoff).await;

let (_, new_target) = tokio::join!(
tokio::time::sleep(backoff),
// Resolve target again, this time ignoring cache. This makes sure
// we always re-fetch the route on error
state.resolve_route(&req_host, &req_path, state.port_type.clone(), true,),
);

match new_target {
Ok(ResolveRouteOutput::Target(new_target)) => {
target = new_target;
}
Ok(ResolveRouteOutput::Response(_response)) => {
error!("Expected target, got response")
}
Err(e) => error!("Routing error: {}", e),
}
Comment on lines +1216 to +1224
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Should propagate routing errors to client with appropriate close code rather than just logging

Suggested change
match new_target {
Ok(ResolveRouteOutput::Target(new_target)) => {
target = new_target;
}
Ok(ResolveRouteOutput::Response(_response)) => {
error!("Expected target, got response")
}
Err(e) => error!("Routing error: {}", e),
}
match new_target {
Ok(ResolveRouteOutput::Target(new_target)) => {
target = new_target;
}
Ok(ResolveRouteOutput::Response(_response)) => {
error!("Expected target, got response");
// Send close with internal error code
let _ = client_sink.send(hyper_tungstenite::tungstenite::Message::Close(Some(
hyper_tungstenite::tungstenite::protocol::CloseFrame {
code: 1011.into(), // Internal error
reason: "Routing error: unexpected response".into(),
}
))).await;
return;
}
Err(e) => {
error!("Routing error: {}", e);
// Send close with internal error code
let _ = client_sink.send(hyper_tungstenite::tungstenite::Message::Close(Some(
hyper_tungstenite::tungstenite::protocol::CloseFrame {
code: 1011.into(), // Internal error
reason: format!("Routing error: {}", e).into(),
}
))).await;
return;
}
}

}

// If we couldn't connect to the upstream server, exit the task
Expand Down
4 changes: 2 additions & 2 deletions packages/edge/infra/guard/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ fn create_middleware_function(
amount: 20, // 20 concurrent requests
},
retry: RetryConfig {
max_attempts: 4, // 3 retry attempts
initial_interval: 250, // 100ms initial interval
max_attempts: 7,
initial_interval: 150,
},
timeout: TimeoutConfig {
request_timeout: 30, // 30 seconds for requests
Expand Down
Loading