diff --git a/Cargo.lock b/Cargo.lock index 30a916f3de..54f5bce96f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5639,7 +5639,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.4.1" +version = "0.4.2" dependencies = [ "anyhow", "backoff", @@ -5657,8 +5657,8 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "protobuf", - "pyth-lazer-protocol 0.10.1", - "pyth-lazer-publisher-sdk 0.3.0", + "pyth-lazer-protocol 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pyth-lazer-publisher-sdk 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.12.22", "serde", "serde_json", @@ -5701,43 +5701,44 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d321e49be0315d68f07d097d240701a05e003e05eff5ac9f2d0457d4a606dd92" +version = "0.14.0" dependencies = [ + "alloy-primitives 0.8.25", "anyhow", + "assert_float_eq", + "bincode 1.3.3", + "bs58", "byteorder", "chrono", "derive_more 1.0.0", + "ed25519-dalek 2.1.1", "hex", "humantime", "humantime-serde", "itertools 0.13.0", + "libsecp256k1 0.7.2", + "mry", "protobuf", "rust_decimal", "serde", "serde_json", + "thiserror 2.0.12", ] [[package]] name = "pyth-lazer-protocol" version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91b3e69c264b2ad80b5943df86c606daae63b13f93062abcc008c09a9e2e621e" dependencies = [ - "alloy-primitives 0.8.25", "anyhow", - "assert_float_eq", - "bincode 1.3.3", - "bs58", "byteorder", "chrono", "derive_more 1.0.0", - "ed25519-dalek 2.1.1", "hex", "humantime", "humantime-serde", "itertools 0.13.0", - "libsecp256k1 0.7.2", - "mry", "protobuf", "rust_decimal", "serde", @@ -5747,27 +5748,27 @@ dependencies = [ [[package]] name = "pyth-lazer-publisher-sdk" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bebeacbc58d9e0143e03a397b08becbed1dacf5baad6a245bc00f74ca5cc50d" +version = "0.10.0" dependencies = [ "anyhow", "fs-err", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.10.1", + "pyth-lazer-protocol 0.14.0", "serde_json", ] [[package]] name = "pyth-lazer-publisher-sdk" version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98f83b818450d72f6f6db5a9d98e90d2668971da14363820829998290d913f80" dependencies = [ "anyhow", "fs-err", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.14.0", + "pyth-lazer-protocol 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json", ] diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index c398fd52d7..020b57a24a 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,14 +1,14 @@ [package] name = "pyth-lazer-agent" -version = "0.4.1" +version = "0.4.2" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" repository = "https://github.com/pyth-network/pyth-crosschain" [dependencies] -pyth-lazer-publisher-sdk = "0.3.0" -pyth-lazer-protocol = "0.10.1" +pyth-lazer-publisher-sdk = "0.10.0" +pyth-lazer-protocol = "0.14.0" anyhow = "1.0.98" backoff = "0.4.0" diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index 7fe9401e25..3db7a4cf1c 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -266,9 +266,9 @@ async fn handle_get_metadata( #[cfg(test)] pub mod tests { + use pyth_lazer_protocol::{PriceFeedId, SymbolState, api::Channel, time::FixedRate}; + use super::*; - use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId}; - use pyth_lazer_protocol::symbol_state::SymbolState; use std::net::SocketAddr; fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata { diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index 7bf7f46ce5..e5b75b41e0 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -13,6 +13,7 @@ use pyth_lazer_publisher_sdk::transaction::{ Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction, }; use solana_keypair::read_keypair_file; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -132,9 +133,10 @@ impl LazerPublisherTask { return Ok(()); } - let mut updates = self.pending_updates.drain(..).collect(); + let mut updates: Vec = self.pending_updates.drain(..).collect(); + updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos))); if self.config.enable_update_deduplication { - deduplicate_feed_updates(&mut updates); + updates = deduplicate_feed_updates(&updates)?; } let publisher_update = PublisherUpdate { @@ -178,9 +180,17 @@ impl LazerPublisherTask { } } -fn deduplicate_feed_updates(feed_updates: &mut Vec) { - // assume that feed_updates is already sorted by timestamp for each feed_update.feed_id - feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone())); +/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp. +/// Assumes the input is sorted by timestamp ascending. +fn deduplicate_feed_updates(sorted_feed_updates: &Vec) -> Result> { + let mut deduped_feed_updates = HashMap::new(); + for update in sorted_feed_updates { + let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update); + if entry.update != update.update { + *entry = update; + } + } + Ok(deduped_feed_updates.into_values().cloned().collect()) } #[cfg(test)] @@ -308,25 +318,27 @@ mod tests { // - (4, 15) // - (5, 15) // - (6, 10) - // we should only return (1, 10), (4, 15), (6, 10) + // - (7, 10) + // we should only return (6, 10) - let updates = &mut vec![ + let updates = &vec![ test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10), test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10), test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15), test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15), test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 10), ]; - let expected_updates = vec![ - test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), - test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15), - test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10), - ]; + let expected_updates = vec![test_feed_update( + 1, + TimestampUs::from_millis(6).unwrap(), + 10, + )]; - deduplicate_feed_updates(updates); - assert_eq!(updates.to_vec(), expected_updates); + let deduped_updates = deduplicate_feed_updates(updates).unwrap(); + assert_eq!(deduped_updates, expected_updates); } #[test] @@ -342,11 +354,38 @@ mod tests { let expected_updates = vec![ test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), + test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), + ]; + + let mut deduped_updates = deduplicate_feed_updates(updates).unwrap(); + deduped_updates.sort_by_key(|u| u.feed_id); + assert_eq!(deduped_updates, expected_updates); + } + + #[test] + fn test_deduplicate_feed_updates_multiple_feeds_random_order() { + let updates = &mut vec![ + test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 20), + test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10), test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15), + test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15), test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 20), + test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10), // last distinct update for feed 1 + test_feed_update(1, TimestampUs::from_millis(9).unwrap(), 10), + test_feed_update(2, TimestampUs::from_millis(10).unwrap(), 15), + test_feed_update(2, TimestampUs::from_millis(11).unwrap(), 15), + test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), // last distinct update for feed 2 + ]; + + let expected_updates = vec![ + test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10), + test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), ]; - deduplicate_feed_updates(updates); - assert_eq!(updates.to_vec(), expected_updates); + let mut deduped_updates = deduplicate_feed_updates(updates).unwrap(); + deduped_updates.sort_by_key(|u| u.feed_id); + assert_eq!(deduped_updates, expected_updates); } } diff --git a/apps/pyth-lazer-agent/src/publisher_handle.rs b/apps/pyth-lazer-agent/src/publisher_handle.rs index a139e1d22d..55a1e87fbf 100644 --- a/apps/pyth-lazer-agent/src/publisher_handle.rs +++ b/apps/pyth-lazer-agent/src/publisher_handle.rs @@ -89,9 +89,9 @@ async fn try_handle_publisher( feed_id: Some(data.price_feed_id.0), source_timestamp: MessageField::some(data.source_timestamp_us.into()), update: Some(Update::PriceUpdate(PriceUpdate { - price: data.price.map(|p| p.0.get()), - best_bid_price: data.best_bid_price.map(|p| p.0.get()), - best_ask_price: data.best_ask_price.map(|p| p.0.get()), + price: data.price.map(|p| p.mantissa_i64()), + best_bid_price: data.best_bid_price.map(|p| p.mantissa_i64()), + best_ask_price: data.best_ask_price.map(|p| p.mantissa_i64()), ..PriceUpdate::default() })), special_fields: Default::default(), @@ -125,8 +125,8 @@ async fn try_handle_publisher( feed_id: Some(data.price_feed_id.0), source_timestamp: MessageField::some(data.source_timestamp_us.into()), update: Some(Update::FundingRateUpdate(FundingRateUpdate { - price: data.price.map(|p| p.0.get()), - rate: data.funding_rate.map(|r| r.0), + price: data.price.map(|p| p.mantissa_i64()), + rate: data.funding_rate.map(|r| r.mantissa()), ..FundingRateUpdate::default() })), special_fields: Default::default(),