From 3c66163b005feb4cfc101a93a2f31b602567a845 Mon Sep 17 00:00:00 2001 From: Ayaz Abbas Date: Wed, 23 Oct 2024 10:45:54 +0100 Subject: [PATCH 1/2] ensure consumers get a unique ephemeral name --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/bin/websocket_server.rs | 13 +++---------- src/utils.rs | 2 +- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2c5677..f758163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,7 +2868,7 @@ dependencies = [ [[package]] name = "pyth-stream" -version = "0.1.6" +version = "0.1.7" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index b163cbc..ca364a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-stream" -version = "0.1.6" +version = "0.1.7" edition = "2021" [lib] diff --git a/src/bin/websocket_server.rs b/src/bin/websocket_server.rs index 82581d5..82f66d5 100644 --- a/src/bin/websocket_server.rs +++ b/src/bin/websocket_server.rs @@ -268,10 +268,10 @@ struct PriceInfo { async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -> Result<()> { let stream_name = "PYTH_PRICE_UPDATES"; - let consumer_name = "websocket_server"; let consumer_config = consumer::pull::Config { - durable_name: Some(consumer_name.to_string()), + deliver_policy: consumer::DeliverPolicy::All, + ack_policy: consumer::AckPolicy::None, ..Default::default() }; @@ -280,7 +280,7 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) - .await .context("Failed to create NATS consumer")?; - info!(stream = %stream_name, consumer = %consumer_name, "Started handling NATS messages"); + info!(stream = %stream_name, "Started handling NATS messages"); loop { let mut messages = consumer @@ -336,13 +336,6 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) - ); } } - - // Spawn a new task for acknowledgment - tokio::spawn(async move { - if let Err(e) = msg.ack().await { - warn!(error = %e, "Failed to acknowledge NATS message"); - } - }); } Err(e) => { error!(error = %e, "Error receiving message from NATS"); diff --git a/src/utils.rs b/src/utils.rs index dec2dcf..c6ec433 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -9,7 +9,7 @@ pub async fn setup_jetstream(nats_client: &async_nats::Client) -> Result Date: Wed, 23 Oct 2024 10:46:46 +0100 Subject: [PATCH 2/2] set stream max_bytes to 4 GB --- src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.rs b/src/utils.rs index c6ec433..dec2dcf 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -9,7 +9,7 @@ pub async fn setup_jetstream(nats_client: &async_nats::Client) -> Result