From 6cb4d080a42e5a0b59bf234e2a09bc156e456d6e Mon Sep 17 00:00:00 2001 From: Ayaz Abbas Date: Tue, 22 Oct 2024 14:39:49 +0100 Subject: [PATCH] enable nats jetstream deduplication --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/bin/pyth_reader.rs | 18 ++++++++++++++++-- src/utils.rs | 2 ++ 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e91bfa1..86c1c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2784,7 +2784,7 @@ dependencies = [ [[package]] name = "pyth-stream" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 41a2c2b..4596f37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-stream" -version = "0.1.0" +version = "0.1.1" edition = "2021" [lib] diff --git a/src/bin/pyth_reader.rs b/src/bin/pyth_reader.rs index 7688a3c..7bc0421 100644 --- a/src/bin/pyth_reader.rs +++ b/src/bin/pyth_reader.rs @@ -1,5 +1,6 @@ use anyhow::Result; use async_nats::jetstream::{self}; +use async_nats::HeaderMap; use clap::Parser; use config::Config; use pyth_sdk_solana::state::{load_price_account, PriceStatus, PythnetPriceAccount}; @@ -160,13 +161,26 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig) let message = serde_json::to_string(&price_update)?; + // Create a unique message ID + let message_id = format!( + "{}:{}", + price_update.price_feed.id, price_update.price_feed.price.publish_time + ); + + // Create headers with the Nats-Msg-Id + let mut headers = HeaderMap::new(); + headers.insert("Nats-Msg-Id", message_id.as_str()); + let jetstream_clone = jetstream.clone(); task::spawn(async move { match jetstream_clone - .publish("pyth.price.updates", message.into()) + .publish_with_headers("pyth.price.updates", headers, message.into()) .await { - Ok(_) => debug!("Published price update to JetStream"), + Ok(_) => debug!( + "Published price update to JetStream with ID: {}", + message_id + ), Err(e) => warn!("Failed to publish price update to JetStream: {}", e), } }); diff --git a/src/utils.rs b/src/utils.rs index 3228f79..4f311fd 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -14,6 +14,8 @@ pub async fn setup_jetstream(nats_client: &async_nats::Client) -> Result