diff --git a/Cargo.lock b/Cargo.lock index e91bfa1..86c1c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2784,7 +2784,7 @@ dependencies = [ [[package]] name = "pyth-stream" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 41a2c2b..4596f37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-stream" -version = "0.1.0" +version = "0.1.1" edition = "2021" [lib] diff --git a/src/bin/pyth_reader.rs b/src/bin/pyth_reader.rs index 7688a3c..7bc0421 100644 --- a/src/bin/pyth_reader.rs +++ b/src/bin/pyth_reader.rs @@ -1,5 +1,6 @@ use anyhow::Result; use async_nats::jetstream::{self}; +use async_nats::HeaderMap; use clap::Parser; use config::Config; use pyth_sdk_solana::state::{load_price_account, PriceStatus, PythnetPriceAccount}; @@ -160,13 +161,26 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig) let message = serde_json::to_string(&price_update)?; + // Create a unique message ID + let message_id = format!( + "{}:{}", + price_update.price_feed.id, price_update.price_feed.price.publish_time + ); + + // Create headers with the Nats-Msg-Id + let mut headers = HeaderMap::new(); + headers.insert("Nats-Msg-Id", message_id.as_str()); + let jetstream_clone = jetstream.clone(); task::spawn(async move { match jetstream_clone - .publish("pyth.price.updates", message.into()) + .publish_with_headers("pyth.price.updates", headers, message.into()) .await { - Ok(_) => debug!("Published price update to JetStream"), + Ok(_) => debug!( + "Published price update to JetStream with ID: {}", + message_id + ), Err(e) => warn!("Failed to publish price update to JetStream: {}", e), } }); diff --git a/src/utils.rs b/src/utils.rs index 3228f79..4f311fd 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -14,6 +14,8 @@ pub async fn setup_jetstream(nats_client: &async_nats::Client) -> Result