Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ regex = "1.4"
rstest = "0.26.1"
rustls-pemfile = "2.2.0"
rustyline = "15.0.0"
scc = "3.3.2"
serde_bare = "0.5.0"
serde_html_form = "0.2.7"
serde_yaml = "0.9.34"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions engine/packages/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,12 @@ pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Out
let packed_key = packed_key.clone();
let kv_key = kv_key.clone();
async move {
(async move {
let value = tx.get(&packed_key, Serializable).await?;
if let Some(v) = value {
Ok(Some(kv_key.deserialize(&v)?))
} else {
Ok(None)
}
})
.await
let value = tx.get(&packed_key, Serializable).await?;
if let Some(v) = value {
Ok(Some(kv_key.deserialize(&v)?))
} else {
Ok(None)
}
}
})
.custom_instrument(tracing::info_span!("get_local_tx"))
Expand Down
49 changes: 22 additions & 27 deletions engine/packages/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,26 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
let kv_key = kv_key.clone();
let cache_key = cache_key.clone();
async move {
(async move {
let (value, cache_value) = tokio::try_join!(
async {
let v = tx.get(&packed_key, Serializable).await?;
if let Some(ref bytes) = v {
Ok(Some(kv_key.deserialize(bytes)?))
} else {
Ok(None)
}
},
async {
let v = tx.get(&packed_cache_key, Serializable).await?;
if let Some(ref bytes) = v {
Ok(Some(cache_key.deserialize(bytes)?))
} else {
Ok(None)
}
let (value, cache_value) = tokio::try_join!(
async {
let v = tx.get(&packed_key, Serializable).await?;
if let Some(ref bytes) = v {
Ok(Some(kv_key.deserialize(bytes)?))
} else {
Ok(None)
}
)?;
},
async {
let v = tx.get(&packed_cache_key, Serializable).await?;
if let Some(ref bytes) = v {
Ok(Some(cache_key.deserialize(bytes)?))
} else {
Ok(None)
}
}
)?;

Ok(value.or(cache_value))
})
.await
Ok(value.or(cache_value))
}
})
.custom_instrument(tracing::info_span!("get_optimistic_tx"))
Expand Down Expand Up @@ -134,13 +131,11 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
let packed_cache_key = packed_cache_key.clone();
let cache_key = cache_key.clone();
let value_to_cache = value.clone();

async move {
(async move {
let serialized = cache_key.serialize(value_to_cache)?;
tx.set(&packed_cache_key, &serialized);
Ok(())
})
.await
let serialized = cache_key.serialize(value_to_cache)?;
tx.set(&packed_cache_key, &serialized);
Ok(())
}
})
.custom_instrument(tracing::info_span!("cache_value_tx"))
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/gasoline/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl StandaloneCtx {
) -> WorkflowResult<Self> {
let ts = rivet_util::timestamp::now();

let span = tracing::Span::current();
span.record("req_id", req_id.to_string());
span.record("ray_id", ray_id.to_string());
tracing::Span::current()
.record("req_id", req_id.to_string())
.record("ray_id", ray_id.to_string());

let msg_ctx = MessageCtx::new(&config, &pools, &cache, ray_id)?;

Expand Down
3 changes: 3 additions & 0 deletions engine/packages/guard-core/src/custom_serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use http_body_util::Full;
use hyper::{Request, Response};
use uuid::Uuid;

use crate::WebSocketHandle;
use crate::proxy_service::ResponseBody;
Expand All @@ -25,5 +26,7 @@ pub trait CustomServeTrait: Send + Sync {
headers: &hyper::HeaderMap,
path: &str,
request_context: &mut RequestContext,
// Identifies the websocket across retries.
unique_request_id: Uuid,
) -> Result<()>;
}
106 changes: 68 additions & 38 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tokio_tungstenite::tungstenite::{
};
use tracing::Instrument;
use url::Url;
use uuid::Uuid;

use crate::{
WebSocketHandle, custom_serve::CustomServeTrait, errors, metrics,
Expand Down Expand Up @@ -1171,7 +1172,7 @@ impl ProxyService {
}

// Handle WebSocket upgrade properly with hyper_tungstenite
tracing::debug!("Upgrading client connection to WebSocket");
tracing::debug!(%req_path, "Upgrading client connection to WebSocket");
let (client_response, client_ws) = match hyper_tungstenite::upgrade(req, None) {
Ok(x) => {
tracing::debug!("Client WebSocket upgrade successful");
Expand Down Expand Up @@ -1782,18 +1783,20 @@ impl ProxyService {
}
ResolveRouteOutput::Response(_) => unreachable!(),
ResolveRouteOutput::CustomServe(mut handlers) => {
tracing::debug!("Spawning task to handle WebSocket communication");
tracing::debug!(%req_path, "Spawning task to handle WebSocket communication");
let mut request_context = request_context.clone();
let req_headers = req_headers.clone();
let req_path = req_path.clone();
let req_host = req_host.clone();

// TODO: Handle errors here, the error message is lost
tokio::spawn(
async move {
let request_id = Uuid::new_v4();
let mut attempts = 0u32;

let ws_handle = WebSocketHandle::new(client_ws);
let ws_handle = WebSocketHandle::new(client_ws)
.await
.context("failed initiating websocket handle")?;

loop {
match handlers
Expand All @@ -1802,6 +1805,7 @@ impl ProxyService {
&req_headers,
&req_path,
&mut request_context,
request_id,
)
.await
{
Expand All @@ -1825,13 +1829,17 @@ impl ProxyService {
break;
}
Err(err) => {
tracing::debug!(?err, "websocket handler error");

attempts += 1;
if attempts > max_attempts || !is_retryable_ws_error(&err) {
tracing::debug!(?attempts, "WebSocket failed to reconnect");

// Close WebSocket with error
ws_handle
.accept_and_send(to_hyper_close(Some(
err_to_close_frame(err, ray_id),
)))
.send(to_hyper_close(Some(err_to_close_frame(
err, ray_id,
))))
.await?;

// Flush to ensure close frame is sent
Expand All @@ -1846,6 +1854,13 @@ impl ProxyService {
attempts,
initial_interval,
);
let backoff = Duration::from_millis(100);

tracing::debug!(
?backoff,
"WebSocket attempt {attempts} failed (service unavailable)"
);

tokio::time::sleep(backoff).await;

match state
Expand All @@ -1864,11 +1879,9 @@ impl ProxyService {
}
Ok(ResolveRouteOutput::Response(response)) => {
ws_handle
.accept_and_send(to_hyper_close(Some(
str_to_close_frame(
response.message.as_ref(),
),
)))
.send(to_hyper_close(Some(str_to_close_frame(
response.message.as_ref(),
))))
.await?;

// Flush to ensure close frame is sent
Expand All @@ -1879,12 +1892,10 @@ impl ProxyService {
}
Ok(ResolveRouteOutput::Target(_)) => {
ws_handle
.accept_and_send(to_hyper_close(Some(
err_to_close_frame(
errors::WebSocketTargetChanged.build(),
ray_id,
),
)))
.send(to_hyper_close(Some(err_to_close_frame(
errors::WebSocketTargetChanged.build(),
ray_id,
))))
.await?;

// Flush to ensure close frame is sent
Expand All @@ -1897,9 +1908,9 @@ impl ProxyService {
}
Err(err) => {
ws_handle
.accept_and_send(to_hyper_close(Some(
err_to_close_frame(err, ray_id),
)))
.send(to_hyper_close(Some(err_to_close_frame(
err, ray_id,
))))
.await?;

// Flush to ensure close frame is sent
Expand Down Expand Up @@ -1947,13 +1958,17 @@ impl ProxyService {

impl ProxyService {
// Process an individual request
#[tracing::instrument(name = "guard_request", skip_all)]
#[tracing::instrument(name = "guard_request", skip_all, fields(ray_id, req_id))]
pub async fn process(&self, mut req: Request<BodyIncoming>) -> Result<Response<ResponseBody>> {
let start_time = Instant::now();

let request_ids = RequestIds::new(self.state.config.dc_label());
req.extensions_mut().insert(request_ids);

tracing::Span::current()
.record("req_id", request_ids.req_id.to_string())
.record("ray_id", request_ids.ray_id.to_string());

// Create request context for analytics tracking
let mut request_context =
RequestContext::new(self.state.clickhouse_inserter.clone(), request_ids);
Expand Down Expand Up @@ -2063,35 +2078,50 @@ impl ProxyService {

// If we receive an error during a websocket request, we attempt to open the websocket anyway
// so we can send the error via websocket instead of http. Most websocket clients don't handle
// HTTP errors in a meaningful way for the user resulting in unhelpful errors
// HTTP errors in a meaningful way resulting in unhelpful errors for the user
if is_websocket {
tracing::debug!("Upgrading client connection to WebSocket for error proxy");
match hyper_tungstenite::upgrade(mock_req, None) {
Ok((client_response, client_ws)) => {
tracing::debug!("Client WebSocket upgrade for error proxy successful");

tokio::spawn(async move {
let ws_handle = WebSocketHandle::new(client_ws);
let frame = err_to_close_frame(err, Some(request_ids.ray_id));
tokio::spawn(
async move {
let ws_handle = match WebSocketHandle::new(client_ws).await {
Ok(ws_handle) => ws_handle,
Err(err) => {
tracing::debug!(
?err,
"failed initiating websocket handle for error proxy"
);
return;
}
};
let frame = err_to_close_frame(err, Some(request_ids.ray_id));

// Manual conversion to handle different tungstenite versions
let code_num: u16 = frame.code.into();
let reason = frame.reason.clone();
// Manual conversion to handle different tungstenite versions
let code_num: u16 = frame.code.into();
let reason = frame.reason.clone();

if let Err(err) = ws_handle
.accept_and_send(
tokio_tungstenite::tungstenite::Message::Close(Some(
if let Err(err) = ws_handle
.send(tokio_tungstenite::tungstenite::Message::Close(Some(
tokio_tungstenite::tungstenite::protocol::CloseFrame {
code: code_num.into(),
reason,
},
)),
)
.await
{
tracing::debug!(?err, "failed sending error proxy");
)))
.await
{
tracing::debug!(
?err,
"failed sending websocket error proxy"
);
}
}
});
.instrument(
tracing::info_span!("ws_error_proxy_task", ?request_ids.ray_id),
),
);

// Return the response that will upgrade the client connection
// For proper WebSocket handshaking, we need to preserve the original response
Expand Down
Loading
Loading