Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-agent"
version = "0.4.2"
version = "0.5.0"
edition = "2024"
description = "Pyth Lazer Agent"
license = "Apache-2.0"
Expand All @@ -26,6 +26,7 @@ humantime-serde = "1.1.1"
hyper = { version = "1.6.0", features = ["http1", "server", "client"] }
hyper-util = { version = "0.1.10", features = ["tokio"] }
protobuf = "3.7.2"
reqwest = "0.12.22"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
soketto = { version = "0.8.1", features = ["http"] }
Expand All @@ -35,8 +36,8 @@ tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
tokio-util = { version = "0.7.14", features = ["compat"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
ttl_cache = "0.5.1"
url = { version = "2.5.4", features = ["serde"] }
reqwest = "0.12.22"

[dev-dependencies]
tempfile = "3.20.0"
Expand Down
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ publish_keypair_path = "/path/to/solana/id.json"
listen_address = "0.0.0.0:8910"
publish_interval_duration = "25ms"
enable_update_deduplication = true
update_deduplication_ttl = "500ms"
# Publishers with existing tokens may still need to provide them. Please confirm with the Lazer team.
# New publishers will not need this line.
# authorization_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pblr"
8 changes: 7 additions & 1 deletion apps/pyth-lazer-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct Config {
pub history_service_url: Option<Url>,
#[serde(default)]
pub enable_update_deduplication: bool,
#[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")]
pub update_deduplication_ttl: Duration,
}

#[derive(Deserialize, Derivative, Clone, PartialEq)]
Expand All @@ -36,7 +38,11 @@ impl Debug for AuthorizationToken {
}

fn default_publish_interval() -> Duration {
Duration::from_micros(500)
Duration::from_millis(25)
}

fn default_update_deduplication_ttl() -> Duration {
Duration::from_millis(500)
}

pub fn load_config(config_path: String) -> anyhow::Result<Config> {
Expand Down
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ pub mod tests {
publish_interval_duration: Default::default(),
history_service_url: None,
enable_update_deduplication: false,
update_deduplication_ttl: Default::default(),
};

println!("{:?}", get_metadata(config).await.unwrap());
Expand Down
56 changes: 50 additions & 6 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use tokio::{
time::interval,
};
use tracing::error;
use ttl_cache::TtlCache;

const DEDUP_CACHE_SIZE: usize = 100_000;

#[derive(Clone, Debug)]
pub struct LazerPublisher {
Expand Down Expand Up @@ -88,6 +91,7 @@ impl LazerPublisher {
pending_updates: Vec::new(),
relayer_sender,
signing_key,
ttl_cache: TtlCache::new(DEDUP_CACHE_SIZE),
};
tokio::spawn(async move { task.run().await });
Self {
Expand All @@ -109,6 +113,7 @@ struct LazerPublisherTask {
pending_updates: Vec<FeedUpdate>,
relayer_sender: broadcast::Sender<SignedLazerTransaction>,
signing_key: SigningKey,
ttl_cache: TtlCache<u32, FeedUpdate>,
}

impl LazerPublisherTask {
Expand Down Expand Up @@ -136,7 +141,16 @@ impl LazerPublisherTask {
let mut updates: Vec<FeedUpdate> = self.pending_updates.drain(..).collect();
updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos)));
if self.config.enable_update_deduplication {
updates = deduplicate_feed_updates(&updates)?;
updates = deduplicate_feed_updates_in_tx(&updates)?;
deduplicate_feed_updates(
&mut updates,
&mut self.ttl_cache,
self.config.update_deduplication_ttl,
);
}

if updates.is_empty() {
return Ok(());
}

let publisher_update = PublisherUpdate {
Expand Down Expand Up @@ -182,7 +196,9 @@ impl LazerPublisherTask {

/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
/// Assumes the input is sorted by timestamp ascending.
fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> {
fn deduplicate_feed_updates_in_tx(
sorted_feed_updates: &Vec<FeedUpdate>,
) -> Result<Vec<FeedUpdate>> {
let mut deduped_feed_updates = HashMap::new();
for update in sorted_feed_updates {
let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update);
Expand All @@ -193,10 +209,35 @@ fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec
Ok(deduped_feed_updates.into_values().cloned().collect())
}

fn deduplicate_feed_updates(
sorted_feed_updates: &mut Vec<FeedUpdate>,
ttl_cache: &mut TtlCache<u32, FeedUpdate>,
ttl: std::time::Duration,
) {
sorted_feed_updates.retain(|update| {
let feed_id = match update.feed_id {
Some(id) => id,
None => return false, // drop updates without feed_id
};

if let Some(cached_feed) = ttl_cache.get(&feed_id) {
if cached_feed.update == update.update {
// drop if the same update is already in the cache
return false;
}
}

ttl_cache.insert(feed_id, update.clone(), ttl);
true
});
}

#[cfg(test)]
mod tests {
use crate::config::{CHANNEL_CAPACITY, Config};
use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates};
use crate::lazer_publisher::{
DEDUP_CACHE_SIZE, LazerPublisherTask, deduplicate_feed_updates_in_tx,
};
use ed25519_dalek::SigningKey;
use protobuf::well_known_types::timestamp::Timestamp;
use protobuf::{Message, MessageField};
Expand All @@ -210,6 +251,7 @@ mod tests {
use tempfile::NamedTempFile;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, mpsc};
use ttl_cache::TtlCache;
use url::Url;

fn get_private_key() -> SigningKey {
Expand Down Expand Up @@ -258,6 +300,7 @@ mod tests {
publish_interval_duration: Duration::from_millis(25),
history_service_url: None,
enable_update_deduplication: false,
update_deduplication_ttl: Default::default(),
};

let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
Expand All @@ -268,6 +311,7 @@ mod tests {
pending_updates: Vec::new(),
relayer_sender,
signing_key,
ttl_cache: TtlCache::new(DEDUP_CACHE_SIZE),
};
tokio::spawn(async move { task.run().await });

Expand Down Expand Up @@ -337,7 +381,7 @@ mod tests {
10,
)];

let deduped_updates = deduplicate_feed_updates(updates).unwrap();
let deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap();
assert_eq!(deduped_updates, expected_updates);
}

Expand All @@ -357,7 +401,7 @@ mod tests {
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
];

let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
let mut deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap();
deduped_updates.sort_by_key(|u| u.feed_id);
assert_eq!(deduped_updates, expected_updates);
}
Expand All @@ -384,7 +428,7 @@ mod tests {
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10),
];

let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
let mut deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap();
deduped_updates.sort_by_key(|u| u.feed_id);
assert_eq!(deduped_updates, expected_updates);
}
Expand Down