diff --git a/Cargo.lock b/Cargo.lock index c2c5677..f758163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,7 +2868,7 @@ dependencies = [ [[package]] name = "pyth-stream" -version = "0.1.6" +version = "0.1.7" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index b163cbc..ca364a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-stream" -version = "0.1.6" +version = "0.1.7" edition = "2021" [lib] diff --git a/src/bin/websocket_server.rs b/src/bin/websocket_server.rs index 82581d5..82f66d5 100644 --- a/src/bin/websocket_server.rs +++ b/src/bin/websocket_server.rs @@ -268,10 +268,10 @@ struct PriceInfo { async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -> Result<()> { let stream_name = "PYTH_PRICE_UPDATES"; - let consumer_name = "websocket_server"; let consumer_config = consumer::pull::Config { - durable_name: Some(consumer_name.to_string()), + deliver_policy: consumer::DeliverPolicy::All, + ack_policy: consumer::AckPolicy::None, ..Default::default() }; @@ -280,7 +280,7 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) - .await .context("Failed to create NATS consumer")?; - info!(stream = %stream_name, consumer = %consumer_name, "Started handling NATS messages"); + info!(stream = %stream_name, "Started handling NATS messages"); loop { let mut messages = consumer @@ -336,13 +336,6 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) - ); } } - - // Spawn a new task for acknowledgment - tokio::spawn(async move { - if let Err(e) = msg.ack().await { - warn!(error = %e, "Failed to acknowledge NATS message"); - } - }); } Err(e) => { error!(error = %e, "Error receiving message from NATS");