From 1d5abac4162e4cda62ef9f96aed5b4abeafda2df Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Fri, 5 Sep 2025 12:51:47 +0100 Subject: [PATCH 1/4] refactor(lazer): rename PythLazerClient to PythLazerStreamClient --- .../client/examples/subscribe_price_feeds.rs | 4 +-- lazer/sdk/rust/client/src/client.rs | 30 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs index 9087450f2b..878b3212a9 100644 --- a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs +++ b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs @@ -2,7 +2,7 @@ use std::time::Duration; use base64::Engine; use pyth_lazer_client::backoff::PythLazerExponentialBackoffBuilder; -use pyth_lazer_client::client::PythLazerClientBuilder; +use pyth_lazer_client::client::PythLazerStreamClientBuilder; use pyth_lazer_client::ws_connection::AnyResponse; use pyth_lazer_protocol::api::{ Channel, DeliveryFormat, Format, JsonBinaryEncoding, SubscriptionParams, SubscriptionParamsRepr, @@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> { .init(); // Create and start the client - let mut client = PythLazerClientBuilder::new(get_lazer_access_token()) + let mut client = PythLazerStreamClientBuilder::new(get_lazer_access_token()) // Optionally override the default endpoints .with_endpoints(vec![ "wss://pyth-lazer-0.dourolabs.app/v1/stream".parse()?, diff --git a/lazer/sdk/rust/client/src/client.rs b/lazer/sdk/rust/client/src/client.rs index 15755e9483..18fc868227 100644 --- a/lazer/sdk/rust/client/src/client.rs +++ b/lazer/sdk/rust/client/src/client.rs @@ -15,12 +15,12 @@ //! ## Basic Usage //! //! ```rust,ignore -//! use pyth_lazer_client::PythLazerClientBuilder; +//! use pyth_lazer_client::PythLazerStreamClientBuilder; //! use pyth_lazer_protocol::subscription::SubscribeRequest; //! //! #[tokio::main] //! async fn main() -> anyhow::Result<()> { -//! let mut client = PythLazerClientBuilder::new("your_access_token".to_string()) +//! let mut client = PythLazerStreamClientBuilder::new("your_access_token".to_string()) //! .with_num_connections(2) //! .build()?; //! @@ -69,7 +69,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); /// A high-performance client for connecting to Pyth Lazer data streams. /// -/// The `PythLazerClient` maintains multiple WebSocket connections to Pyth Lazer endpoints +/// The `PythLazerStreamClient` maintains multiple WebSocket connections to Pyth Lazer endpoints /// for redundancy. It automatically handles connection management, /// message deduplication, and provides a unified stream of price updates. /// @@ -79,7 +79,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); /// - Uses a TTL cache for deduplicating messages across connections /// - Provides a single channel for consuming deduplicated messages /// - Handles connection failures with exponential backoff -pub struct PythLazerClient { +pub struct PythLazerStreamClient { endpoints: Vec, access_token: String, num_connections: usize, @@ -89,10 +89,10 @@ pub struct PythLazerClient { channel_capacity: usize, } -impl PythLazerClient { +impl PythLazerStreamClient { /// Creates a new Pyth Lazer client instance. /// - /// This is a low-level constructor. Consider using [`PythLazerClientBuilder`] for a more + /// This is a low-level constructor. Consider using [`PythLazerStreamClientBuilder`] for a more /// convenient way to create clients with sensible defaults. /// /// # Arguments @@ -106,7 +106,7 @@ impl PythLazerClient { /// /// # Returns /// - /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration is invalid. + /// Returns `Ok(PythLazerStreamClient)` on success, or an error if the configuration is invalid. /// /// # Errors /// @@ -247,7 +247,7 @@ impl PythLazerClient { } } -/// A builder for creating [`PythLazerClient`] instances with customizable configuration. +/// A builder for creating [`PythLazerStreamClient`] instances with customizable configuration. /// /// The builder provides a convenient way to configure a Pyth Lazer client with sensible /// defaults while allowing customization of all parameters. It follows the builder pattern @@ -261,7 +261,7 @@ impl PythLazerClient { /// - **Backoff**: Exponential backoff with default settings /// - **Channel Capacity**: Uses the default 1000 /// -pub struct PythLazerClientBuilder { +pub struct PythLazerStreamClientBuilder { endpoints: Vec, access_token: String, num_connections: usize, @@ -270,7 +270,7 @@ pub struct PythLazerClientBuilder { channel_capacity: usize, } -impl PythLazerClientBuilder { +impl PythLazerStreamClientBuilder { /// Creates a new builder with default configuration. /// /// This initializes the builder with sensible defaults for production use: @@ -368,15 +368,15 @@ impl PythLazerClientBuilder { self } - /// Builds the configured [`PythLazerClient`] instance. + /// Builds the configured [`PythLazerStreamClient`] instance. /// /// This consumes the builder and creates a new client with the specified /// configuration. The client is ready to use but connections are not - /// established until [`PythLazerClient::start`] is called. + /// established until [`PythLazerStreamClient::start`] is called. /// /// # Returns /// - /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration + /// Returns `Ok(PythLazerStreamClient)` on success, or an error if the configuration /// is invalid. /// /// # Errors @@ -385,8 +385,8 @@ impl PythLazerClientBuilder { /// - No endpoints are configured /// - Any configuration parameter is invalid /// - pub fn build(self) -> Result { - PythLazerClient::new( + pub fn build(self) -> Result { + PythLazerStreamClient::new( self.endpoints, self.access_token, self.num_connections, From a869cf8af78b175963d2b9f0a1480c43b4a06263 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Fri, 5 Sep 2025 12:53:01 +0100 Subject: [PATCH 2/4] refactor(lazer): rename stream client module --- lazer/sdk/rust/client/examples/subscribe_price_feeds.rs | 2 +- lazer/sdk/rust/client/src/lib.rs | 2 +- lazer/sdk/rust/client/src/{client.rs => stream_client.rs} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename lazer/sdk/rust/client/src/{client.rs => stream_client.rs} (100%) diff --git a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs index 878b3212a9..24eac797ae 100644 --- a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs +++ b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs @@ -2,7 +2,7 @@ use std::time::Duration; use base64::Engine; use pyth_lazer_client::backoff::PythLazerExponentialBackoffBuilder; -use pyth_lazer_client::client::PythLazerStreamClientBuilder; +use pyth_lazer_client::stream_client::PythLazerStreamClientBuilder; use pyth_lazer_client::ws_connection::AnyResponse; use pyth_lazer_protocol::api::{ Channel, DeliveryFormat, Format, JsonBinaryEncoding, SubscriptionParams, SubscriptionParamsRepr, diff --git a/lazer/sdk/rust/client/src/lib.rs b/lazer/sdk/rust/client/src/lib.rs index c62eab1ff4..d800768829 100644 --- a/lazer/sdk/rust/client/src/lib.rs +++ b/lazer/sdk/rust/client/src/lib.rs @@ -1,6 +1,6 @@ const CHANNEL_CAPACITY: usize = 1000; pub mod backoff; -pub mod client; pub mod resilient_ws_connection; +pub mod stream_client; pub mod ws_connection; diff --git a/lazer/sdk/rust/client/src/client.rs b/lazer/sdk/rust/client/src/stream_client.rs similarity index 100% rename from lazer/sdk/rust/client/src/client.rs rename to lazer/sdk/rust/client/src/stream_client.rs From 20252f83940b9b8b5f96a091632165eeec44b834 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Tue, 9 Sep 2025 15:11:56 +0100 Subject: [PATCH 3/4] feat(lazer): add history client for fetching symbols --- Cargo.lock | 39 +- lazer/sdk/rust/client/Cargo.toml | 10 +- lazer/sdk/rust/client/examples/symbols.rs | 28 ++ lazer/sdk/rust/client/src/history_client.rs | 401 ++++++++++++++++++++ lazer/sdk/rust/client/src/lib.rs | 1 + 5 files changed, 469 insertions(+), 10 deletions(-) create mode 100644 lazer/sdk/rust/client/examples/symbols.rs create mode 100644 lazer/sdk/rust/client/src/history_client.rs diff --git a/Cargo.lock b/Cargo.lock index 54f5bce96f..b3f8301e96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -417,6 +417,12 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "argus" version = "0.1.0" @@ -791,6 +797,17 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomicwrites" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef1bb8d1b645fe38d51dfc331d720fb5fc2c94b440c76cc79c80ff265ca33e3" +dependencies = [ + "rustix 0.38.44", + "tempfile", + "windows-sys 0.52.0", +] + [[package]] name = "atty" version = "0.2.14" @@ -5659,7 +5676,7 @@ dependencies = [ "protobuf", "pyth-lazer-protocol 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "pyth-lazer-publisher-sdk 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "reqwest 0.12.22", + "reqwest 0.12.23", "serde", "serde_json", "soketto", @@ -5675,20 +5692,26 @@ dependencies = [ [[package]] name = "pyth-lazer-client" -version = "6.0.0" +version = "7.0.0" dependencies = [ "alloy-primitives 0.8.25", "anyhow", + "arc-swap", + "atomicwrites", "backoff", "base64 0.22.1", "bincode 1.3.3", "bs58", "derive_more 1.0.0", "ed25519-dalek 2.1.1", + "fs-err", + "futures", "futures-util", "hex", + "humantime-serde", "libsecp256k1 0.7.2", "pyth-lazer-protocol 0.14.0", + "reqwest 0.12.23", "serde", "serde_json", "tokio", @@ -6263,9 +6286,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.22" +version = "0.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" +checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ "async-compression", "base64 0.22.1", @@ -6318,7 +6341,7 @@ dependencies = [ "anyhow", "async-trait", "http 1.3.1", - "reqwest 0.12.22", + "reqwest 0.12.23", "serde", "thiserror 1.0.69", "tower-service", @@ -8082,7 +8105,7 @@ dependencies = [ "crossbeam-channel", "gethostname", "log", - "reqwest 0.12.22", + "reqwest 0.12.23", "solana-cluster-type", "solana-sha256-hasher", "solana-time-utils", @@ -8570,7 +8593,7 @@ dependencies = [ "futures", "indicatif", "log", - "reqwest 0.12.22", + "reqwest 0.12.23", "reqwest-middleware", "semver 1.0.26", "serde", @@ -8605,7 +8628,7 @@ checksum = "362b2f6fd7ea69f9bb87fc3e2da6faf4a8f530011f1c5edad2454a1a79b59ada" dependencies = [ "anyhow", "jsonrpc-core", - "reqwest 0.12.22", + "reqwest 0.12.23", "reqwest-middleware", "serde", "serde_derive", diff --git a/lazer/sdk/rust/client/Cargo.toml b/lazer/sdk/rust/client/Cargo.toml index 7476c8263f..029bb7d3f0 100644 --- a/lazer/sdk/rust/client/Cargo.toml +++ b/lazer/sdk/rust/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-client" -version = "6.0.0" +version = "7.0.0" edition = "2021" description = "A Rust client for Pyth Lazer" license = "Apache-2.0" @@ -15,10 +15,16 @@ serde_json = "1.0" base64 = "0.22.1" anyhow = "1.0" tracing = "0.1" -url = "2.4" +url = { version = "2.4", features = ["serde"] } derive_more = { version = "1.0.0", features = ["from"] } backoff = { version = "0.4.0", features = ["futures", "tokio"] } ttl_cache = "0.5.1" +reqwest = { version = "0.12.23", features = ["json"] } +arc-swap = "1.7.1" +futures = "0.3.31" +humantime-serde = "1.1.1" +fs-err = "3.1.1" +atomicwrites = "0.4.4" [dev-dependencies] diff --git a/lazer/sdk/rust/client/examples/symbols.rs b/lazer/sdk/rust/client/examples/symbols.rs new file mode 100644 index 0000000000..b201e26753 --- /dev/null +++ b/lazer/sdk/rust/client/examples/symbols.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +use pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig}; +use pyth_lazer_protocol::PriceFeedId; +use tokio::time::sleep; +use url::Url; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let urls = std::env::args() + .skip(1) + .map(|s| Url::parse(&s)) + .collect::, _>>()?; + + let client = PythLazerHistoryClient::new(PythLazerHistoryClientConfig { + urls, + update_interval: Duration::from_secs(5), + ..Default::default() + }); + let feeds = client.all_symbols_metadata_handle().await?; + + loop { + println!("feeds len: {}", feeds.symbols().len()); + println!("feed 1: {:?}", feeds.symbols().get(&PriceFeedId(1))); + sleep(Duration::from_secs(15)).await; + } +} diff --git a/lazer/sdk/rust/client/src/history_client.rs b/lazer/sdk/rust/client/src/history_client.rs new file mode 100644 index 0000000000..ee6eeffcc2 --- /dev/null +++ b/lazer/sdk/rust/client/src/history_client.rs @@ -0,0 +1,401 @@ +use std::{ + collections::HashMap, + io::Write, + path::{Path, PathBuf}, + sync::{Arc, Weak}, + time::Duration, +}; + +use anyhow::{bail, Context as _}; +use arc_swap::ArcSwap; +use atomicwrites::replace_atomic; +use backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock}; +use futures::{stream::FuturesUnordered, StreamExt}; +use pyth_lazer_protocol::{jrpc::SymbolMetadata, PriceFeedId}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::{sync::mpsc, time::sleep}; +use tracing::{info, warn}; +use url::Url; + +const DEFAULT_URLS: &[&str] = &["https://history.pyth-lazer.dourolabs.app/"]; +const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(30); +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); + +/// Configuration for the history client. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PythLazerHistoryClientConfig { + /// URLs of the history services. + #[serde(default = "default_urls")] + pub urls: Vec, + /// Interval of queries to the history services. + /// Note: if the request fails, it will be retried using exponential backoff regardless of this setting. + #[serde(with = "humantime_serde", default = "default_update_interval")] + pub update_interval: Duration, + /// Timeout of an individual request. + #[serde(with = "humantime_serde", default = "default_request_timeout")] + pub request_timeout: Duration, + /// Path to the cache directory that can be used to provide latest data if history service is unavailable. + pub cache_dir: Option, + /// Capacity of communication channels created by this client. + #[serde(default = "default_channel_capacity")] + pub channel_capacity: usize, +} + +fn default_urls() -> Vec { + DEFAULT_URLS + .iter() + .map(|url| Url::parse(url).unwrap()) + .collect() +} + +fn default_update_interval() -> Duration { + Duration::from_secs(30) +} + +fn default_request_timeout() -> Duration { + Duration::from_secs(15) +} + +fn default_channel_capacity() -> usize { + 1000 +} + +impl Default for PythLazerHistoryClientConfig { + fn default() -> Self { + Self { + urls: default_urls(), + update_interval: default_update_interval(), + request_timeout: default_request_timeout(), + cache_dir: None, + channel_capacity: default_channel_capacity(), + } + } +} + +/// Client to the history service API. +#[derive(Debug, Clone)] +pub struct PythLazerHistoryClient { + config: Arc, + client: reqwest::Client, +} + +impl PythLazerHistoryClient { + pub fn new(config: PythLazerHistoryClientConfig) -> Self { + Self { + config: Arc::new(config), + client: reqwest::Client::builder() + .timeout(DEFAULT_REQUEST_TIMEOUT) + .build() + .expect("failed to initialize reqwest"), + } + } + + fn symbols_cache_file_path(&self) -> Option { + self.config + .cache_dir + .as_ref() + .map(|path| path.join("symbols_v1.json")) + } + + /// Fetch current metadata for all symbols. + pub async fn all_symbols_metadata(&self) -> anyhow::Result> { + self.fetch_symbols_initial().await + } + + /// Fetch metadata for all symbols as an auto-updating handle. + /// + /// Returns an error if the initial fetch failed. + /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes. + pub async fn all_symbols_metadata_handle(&self) -> anyhow::Result { + let symbols = Arc::new( + self.fetch_symbols_initial() + .await? + .into_iter() + .map(|f| (f.pyth_lazer_id, f)) + .collect::>(), + ); + let previous_symbols = symbols.clone(); + let handle = Arc::new(ArcSwap::new(symbols)); + let client = self.clone(); + let weak_handle = Arc::downgrade(&handle); + tokio::spawn(async move { + client + .update_symbols_handle(weak_handle, previous_symbols) + .await; + }); + Ok(SymbolMetadataHandle(handle)) + } + + /// Fetch metadata for all symbols as an auto-updating handle. + /// + /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes. + /// If the initial fetch failed, the handle will initially contain an empty hashmap. + pub async fn all_symbols_metadata_fault_tolerant_handle(&self) -> SymbolMetadataHandle { + let initial_result = self.fetch_symbols_initial().await; + let symbols = match initial_result { + Ok(data) => data + .into_iter() + .map(|f| (f.pyth_lazer_id, f)) + .collect::>(), + Err(err) => { + warn!( + ?err, + "failed to fetch symbols, proceeding with empty symbol list" + ); + HashMap::new() + } + }; + let symbols = Arc::new(symbols); + let previous_symbols = symbols.clone(); + let handle = Arc::new(ArcSwap::new(symbols)); + let weak_handle = Arc::downgrade(&handle); + let client = self.clone(); + tokio::spawn(async move { + client + .update_symbols_handle(weak_handle, previous_symbols) + .await; + }); + SymbolMetadataHandle(handle) + } + + /// Fetch metadata for all symbols as a receiver. + /// + /// Returns an error if the initial fetch failed. + /// On a successful return, the channel will always contain the initial data that can be fetched + /// immediately from the returned receiver. + /// You should continuously poll the receiver to receive updates. + /// + /// Panics if the buffer capacity is 0. + pub async fn all_symbols_metadata_stream( + &self, + channel_capacity: usize, + ) -> anyhow::Result>> { + let symbols = self.fetch_symbols_initial().await?; + let (sender, receiver) = mpsc::channel(channel_capacity); + + let previous_symbols = symbols.clone(); + sender + .send(symbols) + .await + .expect("send to new channel failed"); + let client = self.clone(); + tokio::spawn(async move { + client.update_symbols_stream(sender, previous_symbols).await; + }); + Ok(receiver) + } + + async fn update_symbols_handle( + &self, + handle: Weak>>, + mut previous_symbols: Arc>, + ) { + info!("starting background task for updating symbols"); + loop { + sleep(DEFAULT_UPDATE_INTERVAL).await; + if handle.upgrade().is_none() { + info!("symbols handle dropped, stopping background task"); + return; + } + match self.fetch_symbols().await { + Ok(new_symbols) => { + let new_symbols = new_symbols + .into_iter() + .map(|f| (f.pyth_lazer_id, f)) + .collect::>(); + if *previous_symbols != new_symbols { + let Some(handle) = handle.upgrade() else { + info!("symbols handle dropped, stopping background task"); + return; + }; + info!("symbols changed"); + if let Some(cache_file_path) = self.symbols_cache_file_path() { + if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) { + warn!(?err, ?cache_file_path, "failed to save data to cache file"); + } + } + let new_symbols = Arc::new(new_symbols); + previous_symbols = new_symbols.clone(); + handle.store(new_symbols); + } + } + Err(err) => { + warn!(?err, "failed to fetch symbols"); + } + } + } + } + + async fn update_symbols_stream( + &self, + handle: mpsc::Sender>, + mut previous_symbols: Vec, + ) { + info!("starting background task for updating symbols"); + loop { + sleep(DEFAULT_UPDATE_INTERVAL).await; + if handle.is_closed() { + info!("symbols channel closed, stopping background task"); + return; + } + match self.fetch_symbols().await { + Ok(new_symbols) => { + if *previous_symbols != new_symbols { + info!("symbols changed"); + if let Some(cache_file_path) = self.symbols_cache_file_path() { + if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) { + warn!(?err, ?cache_file_path, "failed to save data to cache file"); + } + } + previous_symbols = new_symbols.clone(); + if handle.send(new_symbols).await.is_err() { + info!("symbols channel closed, stopping background task"); + return; + } + } + } + Err(err) => { + warn!(?err, "failed to fetch symbols"); + } + } + } + } + + async fn fetch_symbols_initial(&self) -> anyhow::Result> { + let result = self.fetch_symbols().await; + match result { + Ok(data) => { + info!("fetched initial symbols from history service"); + if let Some(cache_file_path) = self.symbols_cache_file_path() { + if let Err(err) = atomic_save_file(&cache_file_path, &data) { + warn!(?err, ?cache_file_path, "failed to save data to cache file"); + } + } + Ok(data) + } + Err(err) => match self.symbols_cache_file_path() { + Some(cache_file_path) => match load_file::>(&cache_file_path) { + Ok(Some(data)) => { + info!(?err, "failed to fetch initial symbols from history service, but fetched last known symbols from cache"); + Ok(data) + } + Ok(None) => Err(err), + Err(cache_err) => { + warn!(?cache_err, "failed to fetch data from cache"); + Err(err) + } + }, + None => Err(err), + }, + } + } + + async fn fetch_symbols(&self) -> anyhow::Result> { + if self.config.urls.is_empty() { + bail!("no history urls provided"); + } + let mut futures = self + .config + .urls + .iter() + .map(|url| Box::pin(self.fetch_symbols_single(url))) + .collect::>(); + while let Some(result) = futures.next().await { + match result { + Ok(output) => return Ok(output), + Err(err) => { + warn!("failed to fetch symbols: {:?}", err); + } + } + } + + bail!( + "failed to fetch symbols from any urls ({:?})", + self.config.urls + ); + } + + async fn fetch_symbols_single(&self, url: &Url) -> anyhow::Result> { + let url = url.join("v1/symbols")?; + retry_notify( + ExponentialBackoff:: { + // We will retry all requests after `update_interval`, so there is + // no reason to continue retrying here. + max_elapsed_time: Some(self.config.update_interval), + ..Default::default() + }, + || async { + let response = self + .client + .get(url.clone()) + .send() + .await + .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))? + .backoff_error_for_status()?; + response + .json::>() + .await + .map_err(|err| backoff::Error::transient(anyhow::Error::from(err))) + }, + |e, _| warn!("failed to fetch symbols from {} (will retry): {:?}", url, e), + ) + .await + } +} + +#[derive(Debug, Clone)] +pub struct SymbolMetadataHandle(Arc>>); + +impl SymbolMetadataHandle { + pub fn symbols(&self) -> arc_swap::Guard>> { + self.0.load() + } + + pub fn new_for_test(data: HashMap) -> Self { + Self(Arc::new(ArcSwap::new(Arc::new(data)))) + } +} + +trait BackoffErrorForStatusExt: Sized { + fn backoff_error_for_status(self) -> Result>; +} + +impl BackoffErrorForStatusExt for reqwest::Response { + fn backoff_error_for_status(self) -> Result> { + let status = self.status(); + self.error_for_status().map_err(|err| { + if status.is_server_error() { + backoff::Error::transient(err.into()) + } else { + backoff::Error::permanent(err.into()) + } + }) + } +} + +fn load_file(path: &Path) -> anyhow::Result> { + let parent_path = path.parent().context("invalid file path: no parent")?; + fs_err::create_dir_all(parent_path)?; + + if !path.try_exists()? { + return Ok(None); + } + let json_data = fs_err::read_to_string(path)?; + let data = serde_json::from_str::(&json_data)?; + Ok(Some(data)) +} + +fn atomic_save_file(path: &Path, data: &impl Serialize) -> anyhow::Result<()> { + let parent_path = path.parent().context("invalid file path: no parent")?; + fs_err::create_dir_all(parent_path)?; + + let json_data = serde_json::to_string(&data)?; + let tmp_path = path.with_extension("tmp"); + let mut tmp_file = fs_err::File::create(&tmp_path)?; + tmp_file.write_all(json_data.as_bytes())?; + tmp_file.flush()?; + tmp_file.sync_all()?; + replace_atomic(&tmp_path, path)?; + + Ok(()) +} diff --git a/lazer/sdk/rust/client/src/lib.rs b/lazer/sdk/rust/client/src/lib.rs index d800768829..e75b94e0ce 100644 --- a/lazer/sdk/rust/client/src/lib.rs +++ b/lazer/sdk/rust/client/src/lib.rs @@ -1,6 +1,7 @@ const CHANNEL_CAPACITY: usize = 1000; pub mod backoff; +pub mod history_client; pub mod resilient_ws_connection; pub mod stream_client; pub mod ws_connection; From 75c8ee128099cf15607f129b628ce49aabfb2f17 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 10 Sep 2025 10:51:47 +0100 Subject: [PATCH 4/4] feat(lazer): add symbols stream example, fix channel_capacity handling in client --- lazer/sdk/rust/client/examples/symbols.rs | 6 ++-- .../rust/client/examples/symbols_stream.rs | 32 +++++++++++++++++++ lazer/sdk/rust/client/src/history_client.rs | 10 +++--- 3 files changed, 40 insertions(+), 8 deletions(-) create mode 100644 lazer/sdk/rust/client/examples/symbols_stream.rs diff --git a/lazer/sdk/rust/client/examples/symbols.rs b/lazer/sdk/rust/client/examples/symbols.rs index b201e26753..6431b09acd 100644 --- a/lazer/sdk/rust/client/examples/symbols.rs +++ b/lazer/sdk/rust/client/examples/symbols.rs @@ -18,11 +18,11 @@ async fn main() -> anyhow::Result<()> { update_interval: Duration::from_secs(5), ..Default::default() }); - let feeds = client.all_symbols_metadata_handle().await?; + let symbols = client.all_symbols_metadata_handle().await?; loop { - println!("feeds len: {}", feeds.symbols().len()); - println!("feed 1: {:?}", feeds.symbols().get(&PriceFeedId(1))); + println!("symbols len: {}", symbols.symbols().len()); + println!("symbol 1: {:?}", symbols.symbols().get(&PriceFeedId(1))); sleep(Duration::from_secs(15)).await; } } diff --git a/lazer/sdk/rust/client/examples/symbols_stream.rs b/lazer/sdk/rust/client/examples/symbols_stream.rs new file mode 100644 index 0000000000..0bb4e8dc22 --- /dev/null +++ b/lazer/sdk/rust/client/examples/symbols_stream.rs @@ -0,0 +1,32 @@ +use std::time::Duration; + +use pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig}; +use pyth_lazer_protocol::PriceFeedId; +use url::Url; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let urls = std::env::args() + .skip(1) + .map(|s| Url::parse(&s)) + .collect::, _>>()?; + + let client = PythLazerHistoryClient::new(PythLazerHistoryClientConfig { + urls, + update_interval: Duration::from_secs(5), + ..Default::default() + }); + let mut symbols_stream = client.all_symbols_metadata_stream().await?; + + while let Some(symbols) = symbols_stream.recv().await { + println!("symbols len: {}", symbols.len()); + println!( + "symbol 1: {:?}", + symbols + .iter() + .find(|feed| feed.pyth_lazer_id == PriceFeedId(1)) + ); + } + Ok(()) +} diff --git a/lazer/sdk/rust/client/src/history_client.rs b/lazer/sdk/rust/client/src/history_client.rs index ee6eeffcc2..4ede284d90 100644 --- a/lazer/sdk/rust/client/src/history_client.rs +++ b/lazer/sdk/rust/client/src/history_client.rs @@ -36,7 +36,7 @@ pub struct PythLazerHistoryClientConfig { pub request_timeout: Duration, /// Path to the cache directory that can be used to provide latest data if history service is unavailable. pub cache_dir: Option, - /// Capacity of communication channels created by this client. + /// Capacity of communication channels created by this client. It must be above zero. #[serde(default = "default_channel_capacity")] pub channel_capacity: usize, } @@ -164,14 +164,14 @@ impl PythLazerHistoryClient { /// On a successful return, the channel will always contain the initial data that can be fetched /// immediately from the returned receiver. /// You should continuously poll the receiver to receive updates. - /// - /// Panics if the buffer capacity is 0. pub async fn all_symbols_metadata_stream( &self, - channel_capacity: usize, ) -> anyhow::Result>> { + if self.config.channel_capacity == 0 { + bail!("channel_capacity cannot be 0"); + } let symbols = self.fetch_symbols_initial().await?; - let (sender, receiver) = mpsc::channel(channel_capacity); + let (sender, receiver) = mpsc::channel(self.config.channel_capacity); let previous_symbols = symbols.clone(); sender