diff --git a/Cargo.lock b/Cargo.lock index 5feb0cc8ee..bd4c6c5dff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5656,7 +5656,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.4.2" +version = "0.5.0" dependencies = [ "anyhow", "backoff", @@ -5687,6 +5687,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "ttl_cache", "url", ] diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index 020b57a24a..000c31786e 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-agent" -version = "0.4.2" +version = "0.5.0" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" @@ -26,6 +26,7 @@ humantime-serde = "1.1.1" hyper = { version = "1.6.0", features = ["http1", "server", "client"] } hyper-util = { version = "0.1.10", features = ["tokio"] } protobuf = "3.7.2" +reqwest = "0.12.22" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" soketto = { version = "0.8.1", features = ["http"] } @@ -35,8 +36,8 @@ tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] } tokio-util = { version = "0.7.14", features = ["compat"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } +ttl_cache = "0.5.1" url = { version = "2.5.4", features = ["serde"] } -reqwest = "0.12.22" [dev-dependencies] tempfile = "3.20.0" diff --git a/apps/pyth-lazer-agent/config/config.toml b/apps/pyth-lazer-agent/config/config.toml index f78fe1d08f..6b779c34a1 100644 --- a/apps/pyth-lazer-agent/config/config.toml +++ b/apps/pyth-lazer-agent/config/config.toml @@ -3,6 +3,7 @@ publish_keypair_path = "/path/to/solana/id.json" listen_address = "0.0.0.0:8910" publish_interval_duration = "25ms" enable_update_deduplication = true +update_deduplication_ttl = "500ms" # Publishers with existing tokens may still need to provide them. Please confirm with the Lazer team. # New publishers will not need this line. # authorization_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pblr" diff --git a/apps/pyth-lazer-agent/src/config.rs b/apps/pyth-lazer-agent/src/config.rs index 8e442c5167..5012a45647 100644 --- a/apps/pyth-lazer-agent/src/config.rs +++ b/apps/pyth-lazer-agent/src/config.rs @@ -21,6 +21,8 @@ pub struct Config { pub history_service_url: Option, #[serde(default)] pub enable_update_deduplication: bool, + #[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")] + pub update_deduplication_ttl: Duration, } #[derive(Deserialize, Derivative, Clone, PartialEq)] @@ -36,7 +38,11 @@ impl Debug for AuthorizationToken { } fn default_publish_interval() -> Duration { - Duration::from_micros(500) + Duration::from_millis(25) +} + +fn default_update_deduplication_ttl() -> Duration { + Duration::from_millis(500) } pub fn load_config(config_path: String) -> anyhow::Result { diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index 3db7a4cf1c..38aad49d89 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -300,6 +300,7 @@ pub mod tests { publish_interval_duration: Default::default(), history_service_url: None, enable_update_deduplication: false, + update_deduplication_ttl: Default::default(), }; println!("{:?}", get_metadata(config).await.unwrap()); diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index e5b75b41e0..58d15e00b1 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -24,6 +24,9 @@ use tokio::{ time::interval, }; use tracing::error; +use ttl_cache::TtlCache; + +const DEDUP_CACHE_SIZE: usize = 100_000; #[derive(Clone, Debug)] pub struct LazerPublisher { @@ -88,6 +91,7 @@ impl LazerPublisher { pending_updates: Vec::new(), relayer_sender, signing_key, + ttl_cache: TtlCache::new(DEDUP_CACHE_SIZE), }; tokio::spawn(async move { task.run().await }); Self { @@ -109,6 +113,7 @@ struct LazerPublisherTask { pending_updates: Vec, relayer_sender: broadcast::Sender, signing_key: SigningKey, + ttl_cache: TtlCache, } impl LazerPublisherTask { @@ -136,7 +141,16 @@ impl LazerPublisherTask { let mut updates: Vec = self.pending_updates.drain(..).collect(); updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos))); if self.config.enable_update_deduplication { - updates = deduplicate_feed_updates(&updates)?; + updates = deduplicate_feed_updates_in_tx(&updates)?; + deduplicate_feed_updates( + &mut updates, + &mut self.ttl_cache, + self.config.update_deduplication_ttl, + ); + } + + if updates.is_empty() { + return Ok(()); } let publisher_update = PublisherUpdate { @@ -182,7 +196,9 @@ impl LazerPublisherTask { /// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp. /// Assumes the input is sorted by timestamp ascending. -fn deduplicate_feed_updates(sorted_feed_updates: &Vec) -> Result> { +fn deduplicate_feed_updates_in_tx( + sorted_feed_updates: &Vec, +) -> Result> { let mut deduped_feed_updates = HashMap::new(); for update in sorted_feed_updates { let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update); @@ -193,10 +209,35 @@ fn deduplicate_feed_updates(sorted_feed_updates: &Vec) -> Result, + ttl_cache: &mut TtlCache, + ttl: std::time::Duration, +) { + sorted_feed_updates.retain(|update| { + let feed_id = match update.feed_id { + Some(id) => id, + None => return false, // drop updates without feed_id + }; + + if let Some(cached_feed) = ttl_cache.get(&feed_id) { + if cached_feed.update == update.update { + // drop if the same update is already in the cache + return false; + } + } + + ttl_cache.insert(feed_id, update.clone(), ttl); + true + }); +} + #[cfg(test)] mod tests { use crate::config::{CHANNEL_CAPACITY, Config}; - use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates}; + use crate::lazer_publisher::{ + DEDUP_CACHE_SIZE, LazerPublisherTask, deduplicate_feed_updates_in_tx, + }; use ed25519_dalek::SigningKey; use protobuf::well_known_types::timestamp::Timestamp; use protobuf::{Message, MessageField}; @@ -210,6 +251,7 @@ mod tests { use tempfile::NamedTempFile; use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, mpsc}; + use ttl_cache::TtlCache; use url::Url; fn get_private_key() -> SigningKey { @@ -258,6 +300,7 @@ mod tests { publish_interval_duration: Duration::from_millis(25), history_service_url: None, enable_update_deduplication: false, + update_deduplication_ttl: Default::default(), }; let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); @@ -268,6 +311,7 @@ mod tests { pending_updates: Vec::new(), relayer_sender, signing_key, + ttl_cache: TtlCache::new(DEDUP_CACHE_SIZE), }; tokio::spawn(async move { task.run().await }); @@ -337,7 +381,7 @@ mod tests { 10, )]; - let deduped_updates = deduplicate_feed_updates(updates).unwrap(); + let deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap(); assert_eq!(deduped_updates, expected_updates); } @@ -357,7 +401,7 @@ mod tests { test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), ]; - let mut deduped_updates = deduplicate_feed_updates(updates).unwrap(); + let mut deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap(); deduped_updates.sort_by_key(|u| u.feed_id); assert_eq!(deduped_updates, expected_updates); } @@ -384,7 +428,7 @@ mod tests { test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), ]; - let mut deduped_updates = deduplicate_feed_updates(updates).unwrap(); + let mut deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap(); deduped_updates.sort_by_key(|u| u.feed_id); assert_eq!(deduped_updates, expected_updates); }