From 6ffb5fb2c49d93d4440f27d932498a2a730cb8d8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 17 Oct 2025 15:36:27 +0000 Subject: [PATCH 1/5] feat(pyth-lazer-agent): add HTTP proxy support for WebSocket connections Add proxy_url configuration option to support connecting through HTTP/HTTPS proxies. Implements manual HTTP CONNECT handshake with Basic authentication support and TLS upgrade for secure WebSocket connections. - Add proxy_url: Option to Config struct - Implement connect_through_proxy function with HTTP CONNECT method - Support Basic authentication via proxy URL credentials - Add tokio-native-tls dependency for TLS support - Update README with proxy configuration examples - Bump version from 0.6.1 to 0.7.0 Co-Authored-By: Mike Rolish --- Cargo.lock | 3 +- apps/pyth-lazer-agent/Cargo.toml | 3 +- apps/pyth-lazer-agent/README.md | 4 + apps/pyth-lazer-agent/src/config.rs | 1 + apps/pyth-lazer-agent/src/jrpc_handle.rs | 1 + apps/pyth-lazer-agent/src/lazer_publisher.rs | 2 + apps/pyth-lazer-agent/src/relayer_session.rs | 144 +++++++++++++++++-- 7 files changed, 143 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b1412cab5..a39fc3e90f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5656,7 +5656,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.6.1" +version = "0.7.0" dependencies = [ "anyhow", "backoff", @@ -5683,6 +5683,7 @@ dependencies = [ "solana-keypair", "tempfile", "tokio", + "tokio-native-tls", "tokio-tungstenite 0.26.2", "tokio-util", "tracing", diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index dc1a1d28d6..8724d50f2f 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-agent" -version = "0.6.1" +version = "0.7.0" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" @@ -32,6 +32,7 @@ serde_json = "1.0.140" soketto = { version = "0.8.1", features = ["http"] } solana-keypair = "2.2.1" tokio = { version = "1.44.1", features = ["full"] } +tokio-native-tls = "0.3.1" tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] } tokio-util = { version = "0.7.14", features = ["compat"] } tracing = "0.1.41" diff --git a/apps/pyth-lazer-agent/README.md b/apps/pyth-lazer-agent/README.md index b067cf8112..dac27a3401 100644 --- a/apps/pyth-lazer-agent/README.md +++ b/apps/pyth-lazer-agent/README.md @@ -50,6 +50,9 @@ authorization_token = "your_token" listen_address = "0.0.0.0:8910" publish_interval_duration = "25ms" enable_update_deduplication = false +# Optional proxy configuration +# proxy_url = "http://proxy.example.com:8080" +# proxy_url = "http://username:password@proxy.example.com:8080" # With authentication ``` - `relayers_urls`: The Lazer team will provide these. @@ -58,3 +61,4 @@ enable_update_deduplication = false - `listen_address`: The local port the agent will be listening on; can be anything you want. - `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here. - `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer. +- `proxy_url` (optional): HTTP/HTTPS proxy URL for WebSocket connections. Supports Basic authentication via URL credentials (e.g., `http://user:pass@proxy:port`). diff --git a/apps/pyth-lazer-agent/src/config.rs b/apps/pyth-lazer-agent/src/config.rs index 5012a45647..f9929e16da 100644 --- a/apps/pyth-lazer-agent/src/config.rs +++ b/apps/pyth-lazer-agent/src/config.rs @@ -23,6 +23,7 @@ pub struct Config { pub enable_update_deduplication: bool, #[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")] pub update_deduplication_ttl: Duration, + pub proxy_url: Option, } #[derive(Deserialize, Derivative, Clone, PartialEq)] diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index 895ae368ee..a5f46fd9ff 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -312,6 +312,7 @@ pub mod tests { history_service_url: None, enable_update_deduplication: false, update_deduplication_ttl: Default::default(), + proxy_url: None, }; println!("{:?}", get_metadata(config).await.unwrap()); diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index 58d15e00b1..53ffb2a22c 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -80,6 +80,7 @@ impl LazerPublisher { token: authorization_token.clone(), receiver: relayer_sender.subscribe(), is_ready: is_ready.clone(), + proxy_url: config.proxy_url.clone(), }; tokio::spawn(async move { task.run().await }); } @@ -301,6 +302,7 @@ mod tests { history_service_url: None, enable_update_deduplication: false, update_deduplication_ttl: Default::default(), + proxy_url: None, }; let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index 8a26e13fb2..c0115742bb 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -1,6 +1,7 @@ -use anyhow::{Result, bail}; +use anyhow::{Context, Result, bail}; use backoff::ExponentialBackoffBuilder; use backoff::backoff::Backoff; +use base64::Engine; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; use http::HeaderValue; @@ -9,12 +10,13 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::select; use tokio::sync::broadcast; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::{ - MaybeTlsStream, WebSocketStream, connect_async_with_config, + MaybeTlsStream, WebSocketStream, client_async, connect_async_with_config, tungstenite::Message as TungsteniteMessage, }; use url::Url; @@ -22,19 +24,137 @@ use url::Url; type RelayerWsSender = SplitSink>, TungsteniteMessage>; type RelayerWsReceiver = SplitStream>>; -async fn connect_to_relayer(url: Url, token: &str) -> Result<(RelayerWsSender, RelayerWsReceiver)> { - tracing::info!("connecting to the relayer at {}", url); - let mut req = url.clone().into_client_request()?; +async fn connect_through_proxy( + proxy_url: &Url, + target_url: &Url, + token: &str, +) -> Result<(RelayerWsSender, RelayerWsReceiver)> { + tracing::info!( + "connecting to the relayer at {} via proxy {}", + target_url, + proxy_url + ); + + let proxy_host = proxy_url.host_str().context("Proxy URL must have a host")?; + let proxy_port = proxy_url + .port() + .unwrap_or(if proxy_url.scheme() == "https" { + 443 + } else { + 80 + }); + + let proxy_addr = format!("{}:{}", proxy_host, proxy_port); + let mut stream = TcpStream::connect(&proxy_addr) + .await + .context(format!("Failed to connect to proxy at {}", proxy_addr))?; + + let target_host = target_url + .host_str() + .context("Target URL must have a host")?; + let target_port = target_url + .port() + .unwrap_or(if target_url.scheme() == "wss" { + 443 + } else { + 80 + }); + + let mut connect_request = format!( + "CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n", + target_host, target_port, target_host, target_port + ); + + let username = proxy_url.username(); + if !username.is_empty() { + let password = proxy_url.password().unwrap_or(""); + let credentials = format!("{}:{}", username, password); + let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes()); + connect_request = format!( + "{}Proxy-Authorization: Basic {}\r\n", + connect_request, encoded + ); + } + + connect_request = format!("{}\r\n", connect_request); + + stream + .write_all(connect_request.as_bytes()) + .await + .context("Failed to send CONNECT request to proxy")?; + + let mut response = vec![0u8; 1024]; + let n = stream + .read(&mut response) + .await + .context("Failed to read CONNECT response from proxy")?; + + let response_str = String::from_utf8_lossy(&response[..n]); + + if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") { + bail!( + "Proxy CONNECT failed: {}", + response_str.lines().next().unwrap_or("Unknown error") + ); + } + + tracing::info!("Successfully connected through proxy"); + + let mut req = target_url.clone().into_client_request()?; let headers = req.headers_mut(); headers.insert( "Authorization", - HeaderValue::from_str(&format!("Bearer {token}"))?, + HeaderValue::from_str(&format!("Bearer {}", token))?, + ); + + let maybe_tls_stream = if target_url.scheme() == "wss" { + let tls_connector = tokio_native_tls::native_tls::TlsConnector::builder() + .build() + .context("Failed to build TLS connector")?; + let tokio_connector = tokio_native_tls::TlsConnector::from(tls_connector); + let domain = target_host; + let tls_stream = tokio_connector + .connect(domain, stream) + .await + .context("Failed to establish TLS connection")?; + + MaybeTlsStream::NativeTls(tls_stream) + } else { + MaybeTlsStream::Plain(stream) + }; + + let (ws_stream, _) = client_async(req, maybe_tls_stream) + .await + .context("Failed to complete WebSocket handshake")?; + + tracing::info!( + "WebSocket connection established to relayer at {}", + target_url ); - let (ws_stream, _) = connect_async_with_config(req, None, true).await?; - tracing::info!("connected to the relayer at {}", url); Ok(ws_stream.split()) } +async fn connect_to_relayer( + url: Url, + token: &str, + proxy_url: Option<&Url>, +) -> Result<(RelayerWsSender, RelayerWsReceiver)> { + if let Some(proxy) = proxy_url { + connect_through_proxy(proxy, &url, token).await + } else { + tracing::info!("connecting to the relayer at {}", url); + let mut req = url.clone().into_client_request()?; + let headers = req.headers_mut(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {token}"))?, + ); + let (ws_stream, _) = connect_async_with_config(req, None, true).await?; + tracing::info!("connected to the relayer at {}", url); + Ok(ws_stream.split()) + } +} + struct RelayerWsSession { ws_sender: RelayerWsSender, } @@ -58,11 +178,11 @@ impl RelayerWsSession { } pub struct RelayerSessionTask { - // connection state pub url: Url, pub token: String, pub receiver: broadcast::Receiver, pub is_ready: Arc, + pub proxy_url: Option, } impl RelayerSessionTask { @@ -108,10 +228,8 @@ impl RelayerSessionTask { } pub async fn run_relayer_connection(&mut self) -> Result<()> { - // Establish relayer connection - // Relayer will drop the connection if no data received in 5s let (relayer_ws_sender, mut relayer_ws_receiver) = - connect_to_relayer(self.url.clone(), &self.token).await?; + connect_to_relayer(self.url.clone(), &self.token, self.proxy_url.as_ref()).await?; let mut relayer_ws_session = RelayerWsSession { ws_sender: relayer_ws_sender, }; @@ -236,11 +354,11 @@ mod tests { let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); let mut relayer_session_task = RelayerSessionTask { - // connection state url: Url::parse("ws://127.0.0.1:12346").unwrap(), token: "token1".to_string(), receiver: relayer_receiver, is_ready: Arc::new(AtomicBool::new(false)), + proxy_url: None, }; tokio::spawn(async move { relayer_session_task.run().await }); tokio::time::sleep(std::time::Duration::from_millis(1000)).await; From 7c4415c8cda9a02f2927f71431037f53770fc417 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 17 Oct 2025 15:41:02 +0000 Subject: [PATCH 2/5] fix: address clippy warnings in proxy implementation - Use inline format args for cleaner string formatting - Replace slice indexing with safe .get() method - All clippy checks now pass Co-Authored-By: Mike Rolish --- apps/pyth-lazer-agent/src/relayer_session.rs | 27 ++++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index c0115742bb..70c918b19b 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -44,10 +44,10 @@ async fn connect_through_proxy( 80 }); - let proxy_addr = format!("{}:{}", proxy_host, proxy_port); + let proxy_addr = format!("{proxy_host}:{proxy_port}"); let mut stream = TcpStream::connect(&proxy_addr) .await - .context(format!("Failed to connect to proxy at {}", proxy_addr))?; + .context(format!("Failed to connect to proxy at {proxy_addr}"))?; let target_host = target_url .host_str() @@ -60,23 +60,18 @@ async fn connect_through_proxy( 80 }); - let mut connect_request = format!( - "CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n", - target_host, target_port, target_host, target_port - ); + let mut connect_request = + format!("CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n"); let username = proxy_url.username(); if !username.is_empty() { let password = proxy_url.password().unwrap_or(""); - let credentials = format!("{}:{}", username, password); + let credentials = format!("{username}:{password}"); let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes()); - connect_request = format!( - "{}Proxy-Authorization: Basic {}\r\n", - connect_request, encoded - ); + connect_request = format!("{connect_request}Proxy-Authorization: Basic {encoded}\r\n"); } - connect_request = format!("{}\r\n", connect_request); + connect_request = format!("{connect_request}\r\n"); stream .write_all(connect_request.as_bytes()) @@ -89,7 +84,11 @@ async fn connect_through_proxy( .await .context("Failed to read CONNECT response from proxy")?; - let response_str = String::from_utf8_lossy(&response[..n]); + let response_str = String::from_utf8_lossy( + response + .get(..n) + .context("Invalid response slice range")?, + ); if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") { bail!( @@ -104,7 +103,7 @@ async fn connect_through_proxy( let headers = req.headers_mut(); headers.insert( "Authorization", - HeaderValue::from_str(&format!("Bearer {}", token))?, + HeaderValue::from_str(&format!("Bearer {token}"))?, ); let maybe_tls_stream = if target_url.scheme() == "wss" { From 90cdadf9fbccb28f6b5a95d6a8967fa68465e01d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 17 Oct 2025 15:43:29 +0000 Subject: [PATCH 3/5] fix: apply rustfmt formatting Co-Authored-By: Mike Rolish --- apps/pyth-lazer-agent/src/relayer_session.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index 70c918b19b..e425e45441 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -60,8 +60,9 @@ async fn connect_through_proxy( 80 }); - let mut connect_request = - format!("CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n"); + let mut connect_request = format!( + "CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n" + ); let username = proxy_url.username(); if !username.is_empty() { @@ -84,11 +85,8 @@ async fn connect_through_proxy( .await .context("Failed to read CONNECT response from proxy")?; - let response_str = String::from_utf8_lossy( - response - .get(..n) - .context("Invalid response slice range")?, - ); + let response_str = + String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?); if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") { bail!( From 6cb811183d73a408a22f76614dd70d87fa7a8e82 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 17 Oct 2025 19:49:09 +0000 Subject: [PATCH 4/5] feat(pyth-lazer-agent): add proxy URL to logging statements for debugging - Add proxy URL to error messages when sending/reading CONNECT requests - Add proxy URL to success log after proxy connection - Add proxy URL to final WebSocket success log for better traceability Co-Authored-By: Mike Rolish --- apps/pyth-lazer-agent/src/relayer_session.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index e425e45441..97bef61504 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -77,13 +77,14 @@ async fn connect_through_proxy( stream .write_all(connect_request.as_bytes()) .await - .context("Failed to send CONNECT request to proxy")?; + .context(format!( + "Failed to send CONNECT request to proxy at {proxy_url}" + ))?; let mut response = vec![0u8; 1024]; - let n = stream - .read(&mut response) - .await - .context("Failed to read CONNECT response from proxy")?; + let n = stream.read(&mut response).await.context(format!( + "Failed to read CONNECT response from proxy at {proxy_url}" + ))?; let response_str = String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?); @@ -95,7 +96,7 @@ async fn connect_through_proxy( ); } - tracing::info!("Successfully connected through proxy"); + tracing::info!("Successfully connected through proxy at {}", proxy_url); let mut req = target_url.clone().into_client_request()?; let headers = req.headers_mut(); @@ -125,8 +126,9 @@ async fn connect_through_proxy( .context("Failed to complete WebSocket handshake")?; tracing::info!( - "WebSocket connection established to relayer at {}", - target_url + "WebSocket connection established to relayer at {} via proxy {}", + target_url, + proxy_url ); Ok(ws_stream.split()) } From df4d388e1e550bd23dc35fd386187659d0725acb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 23 Oct 2025 21:43:05 +0000 Subject: [PATCH 5/5] refactor(pyth-lazer-agent): improve proxy CONNECT implementation - Replace manual string construction with proper HTTP request building - Add robust HTTP response parsing with status code extraction - Read response headers until complete (detect \r\n\r\n terminator) - Provide detailed error messages with status codes and descriptions - Add Proxy-Connection: Keep-Alive header for better compatibility - Fix clippy warnings for inline format args and safe slice access - Improve error handling for edge cases (empty response, invalid status) Co-Authored-By: Mike Rolish --- apps/pyth-lazer-agent/src/relayer_session.rs | 72 ++++++++++++++++---- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index 97bef61504..1dc016bc73 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -60,19 +60,23 @@ async fn connect_through_proxy( 80 }); - let mut connect_request = format!( - "CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n" - ); + let target_authority = format!("{target_host}:{target_port}"); + let mut request_parts = vec![format!("CONNECT {target_authority} HTTP/1.1")]; + request_parts.push(format!("Host: {target_authority}")); let username = proxy_url.username(); if !username.is_empty() { let password = proxy_url.password().unwrap_or(""); let credentials = format!("{username}:{password}"); let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes()); - connect_request = format!("{connect_request}Proxy-Authorization: Basic {encoded}\r\n"); + request_parts.push(format!("Proxy-Authorization: Basic {encoded}")); } - connect_request = format!("{connect_request}\r\n"); + request_parts.push("Proxy-Connection: Keep-Alive".to_string()); + request_parts.push(String::new()); // Empty line to end headers + request_parts.push(String::new()); // CRLF to end request + + let connect_request = request_parts.join("\r\n"); stream .write_all(connect_request.as_bytes()) @@ -81,18 +85,58 @@ async fn connect_through_proxy( "Failed to send CONNECT request to proxy at {proxy_url}" ))?; - let mut response = vec![0u8; 1024]; - let n = stream.read(&mut response).await.context(format!( - "Failed to read CONNECT response from proxy at {proxy_url}" - ))?; + let mut response_buffer = Vec::new(); + let mut temp_buf = [0u8; 1024]; + let mut headers_complete = false; + + while !headers_complete { + let n = stream.read(&mut temp_buf).await.context(format!( + "Failed to read CONNECT response from proxy at {proxy_url}" + ))?; + + if n == 0 { + bail!("Proxy closed connection before sending complete response"); + } + + response_buffer.extend_from_slice(temp_buf.get(..n).context("Invalid buffer slice")?); - let response_str = - String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?); + if response_buffer.windows(4).any(|w| w == b"\r\n\r\n") { + headers_complete = true; + } + } + + let response_str = String::from_utf8_lossy(&response_buffer); + + let status_line = response_str + .lines() + .next() + .context("Empty response from proxy")?; + + let parts: Vec<&str> = status_line.split_whitespace().collect(); + if parts.len() < 2 { + bail!( + "Invalid HTTP response from proxy at {}: {}", + proxy_url, + status_line + ); + } - if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") { + let status_code = parts + .get(1) + .context("Missing status code in proxy response")? + .parse::() + .context("Invalid status code in proxy response")?; + + if status_code != 200 { + let status_text = parts + .get(2..) + .map(|s| s.join(" ")) + .unwrap_or_else(|| "Unknown".to_string()); bail!( - "Proxy CONNECT failed: {}", - response_str.lines().next().unwrap_or("Unknown error") + "Proxy CONNECT failed with status {} {}: {}", + status_code, + status_text, + status_line ); }