Skip to content
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[package]
name = "pyth-lazer-agent"
version = "0.4.1"
version = "0.4.2"
edition = "2024"
description = "Pyth Lazer Agent"
license = "Apache-2.0"
repository = "https://github.com/pyth-network/pyth-crosschain"

[dependencies]
pyth-lazer-publisher-sdk = "0.3.0"
pyth-lazer-protocol = "0.10.1"
pyth-lazer-publisher-sdk = "0.10.0"
pyth-lazer-protocol = "0.14.0"

anyhow = "1.0.98"
backoff = "0.4.0"
Expand Down
4 changes: 2 additions & 2 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(

#[cfg(test)]
pub mod tests {
use pyth_lazer_protocol::{PriceFeedId, SymbolState, api::Channel, time::FixedRate};

use super::*;
use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId};
use pyth_lazer_protocol::symbol_state::SymbolState;
use std::net::SocketAddr;

fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata {
Expand Down
71 changes: 55 additions & 16 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use pyth_lazer_publisher_sdk::transaction::{
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
};
use solana_keypair::read_keypair_file;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -132,9 +133,10 @@ impl LazerPublisherTask {
return Ok(());
}

let mut updates = self.pending_updates.drain(..).collect();
let mut updates: Vec<FeedUpdate> = self.pending_updates.drain(..).collect();
updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos)));
if self.config.enable_update_deduplication {
deduplicate_feed_updates(&mut updates);
updates = deduplicate_feed_updates(&updates)?;
}

let publisher_update = PublisherUpdate {
Expand Down Expand Up @@ -178,9 +180,17 @@ impl LazerPublisherTask {
}
}

fn deduplicate_feed_updates(feed_updates: &mut Vec<FeedUpdate>) {
// assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone()));
/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
/// Assumes the input is sorted by timestamp ascending.
fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> {
let mut deduped_feed_updates = HashMap::new();
for update in sorted_feed_updates {
let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update);
if entry.update != update.update {
*entry = update;
}
}
Ok(deduped_feed_updates.into_values().cloned().collect())
}

#[cfg(test)]
Expand Down Expand Up @@ -308,25 +318,27 @@ mod tests {
// - (4, 15)
// - (5, 15)
// - (6, 10)
// we should only return (1, 10), (4, 15), (6, 10)
// - (7, 10)
// we should only return (6, 10)

let updates = &mut vec![
let updates = &vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 10),
];

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
];
let expected_updates = vec![test_feed_update(
1,
TimestampUs::from_millis(6).unwrap(),
10,
)];

deduplicate_feed_updates(updates);
assert_eq!(updates.to_vec(), expected_updates);
let deduped_updates = deduplicate_feed_updates(updates).unwrap();
assert_eq!(deduped_updates, expected_updates);
}

#[test]
Expand All @@ -342,11 +354,38 @@ mod tests {

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
];

let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
deduped_updates.sort_by_key(|u| u.feed_id);
assert_eq!(deduped_updates, expected_updates);
}

#[test]
fn test_deduplicate_feed_updates_multiple_feeds_random_order() {
let updates = &mut vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 20),
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 20),
test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10), // last distinct update for feed 1
test_feed_update(1, TimestampUs::from_millis(9).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(10).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(11).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), // last distinct update for feed 2
];

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10),
];

deduplicate_feed_updates(updates);
assert_eq!(updates.to_vec(), expected_updates);
let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
deduped_updates.sort_by_key(|u| u.feed_id);
assert_eq!(deduped_updates, expected_updates);
}
}
10 changes: 5 additions & 5 deletions apps/pyth-lazer-agent/src/publisher_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ async fn try_handle_publisher(
feed_id: Some(data.price_feed_id.0),
source_timestamp: MessageField::some(data.source_timestamp_us.into()),
update: Some(Update::PriceUpdate(PriceUpdate {
price: data.price.map(|p| p.0.get()),
best_bid_price: data.best_bid_price.map(|p| p.0.get()),
best_ask_price: data.best_ask_price.map(|p| p.0.get()),
price: data.price.map(|p| p.mantissa_i64()),
best_bid_price: data.best_bid_price.map(|p| p.mantissa_i64()),
best_ask_price: data.best_ask_price.map(|p| p.mantissa_i64()),
..PriceUpdate::default()
})),
special_fields: Default::default(),
Expand Down Expand Up @@ -125,8 +125,8 @@ async fn try_handle_publisher(
feed_id: Some(data.price_feed_id.0),
source_timestamp: MessageField::some(data.source_timestamp_us.into()),
update: Some(Update::FundingRateUpdate(FundingRateUpdate {
price: data.price.map(|p| p.0.get()),
rate: data.funding_rate.map(|r| r.0),
price: data.price.map(|p| p.mantissa_i64()),
rate: data.funding_rate.map(|r| r.mantissa()),
..FundingRateUpdate::default()
})),
special_fields: Default::default(),
Expand Down