From f479b0efa5b94496aefa42d3241f0c7974610533 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 20 Feb 2023 15:36:44 +0100 Subject: [PATCH 1/2] Add NAK and backoff support Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 4 +- async-nats/src/jetstream/consumer/mod.rs | 5 +- async-nats/src/jetstream/consumer/pull.rs | 5 ++ async-nats/src/jetstream/consumer/push.rs | 6 ++ async-nats/src/jetstream/message.rs | 9 ++- async-nats/tests/jetstream_tests.rs | 95 ++++++++++++++++++++++- 6 files changed, 117 insertions(+), 7 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 0fa49c0b0..46272d383 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -21,7 +21,7 @@ futures = "0.3.21" nkeys = "0.2.0" once_cell = "1.10.0" regex = "1.5.5" -serde = { version = "1.0.136", features = ["derive"] } +serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.79" serde_repr = "0.1.7" http = "0.2.7" @@ -31,7 +31,7 @@ url = "2" tokio-rustls = "0.23" rustls-pemfile = "1.0.1" nuid = "0.3.2" -serde_nanos = "0.1.1" +serde_nanos = "0.1.3" time = { version = "0.3.6", features = ["parsing", "formatting", "serde", "serde-well-known"] } rustls-native-certs = "0.6.2" tracing = "0.1" diff --git a/async-nats/src/jetstream/consumer/mod.rs b/async-nats/src/jetstream/consumer/mod.rs index c06f053a3..a7d3a30e5 100644 --- a/async-nats/src/jetstream/consumer/mod.rs +++ b/async-nats/src/jetstream/consumer/mod.rs @@ -313,9 +313,12 @@ pub struct Config { /// Force consumer to use memory storage. #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")] pub memory_storage: bool, - // Additional consumer metadata. + /// Additional consumer metadata. #[serde(default, skip_serializing_if = "is_default")] pub metadata: HashMap, + /// Custom backoff for missed acknowledgments. + #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")] + pub backoff: Vec, } impl From<&Config> for Config { diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 8b915b4d7..734be1cd1 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -1514,6 +1514,9 @@ pub struct Config { // Additional consumer metadata. #[serde(default, skip_serializing_if = "is_default")] pub metadata: HashMap, + /// Custom backoff for missed acknowledgments. + #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")] + pub backoff: Vec, } impl IntoConsumerConfig for &Config { @@ -1550,6 +1553,7 @@ impl IntoConsumerConfig for Config { num_replicas: self.num_replicas, memory_storage: self.memory_storage, metadata: self.metadata, + backoff: self.backoff, } } } @@ -1583,6 +1587,7 @@ impl FromConsumer for Config { num_replicas: config.num_replicas, memory_storage: config.memory_storage, metadata: config.metadata, + backoff: config.backoff, }) } } diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 013296f29..442c72aab 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -223,6 +223,9 @@ pub struct Config { // Additional consumer metadata. #[serde(default, skip_serializing_if = "is_default")] pub metadata: HashMap, + /// Custom backoff for missed acknowledgments. + #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")] + pub backoff: Vec, } impl FromConsumer for Config { @@ -257,6 +260,7 @@ impl FromConsumer for Config { num_replicas: config.num_replicas, memory_storage: config.memory_storage, metadata: config.metadata, + backoff: config.backoff, }) } } @@ -289,6 +293,7 @@ impl IntoConsumerConfig for Config { num_replicas: self.num_replicas, memory_storage: self.memory_storage, metadata: self.metadata, + backoff: self.backoff, } } } @@ -399,6 +404,7 @@ impl IntoConsumerConfig for OrderedConfig { num_replicas: 1, memory_storage: true, metadata: self.metadata, + backoff: Vec::new(), } } } diff --git a/async-nats/src/jetstream/message.rs b/async-nats/src/jetstream/message.rs index e58ce9fb9..560a82bd5 100644 --- a/async-nats/src/jetstream/message.rs +++ b/async-nats/src/jetstream/message.rs @@ -14,10 +14,10 @@ //! A wrapped `crate::Message` with `JetStream` related methods. use super::context::Context; use crate::Error; - use bytes::Bytes; use futures::future::TryFutureExt; use futures::StreamExt; +use std::time::Duration; use time::OffsetDateTime; #[derive(Debug)] @@ -314,7 +314,7 @@ pub enum AckKind { /// Signals that the message will not be processed now /// and processing can move onto the next message, NAK'd /// message will be retried. - Nak, + Nak(Option), /// When sent before the AckWait period indicates that /// work is ongoing and the period should be extended by /// another equal to AckWait. @@ -333,7 +333,10 @@ impl From for Bytes { use AckKind::*; match kind { Ack => Bytes::from_static(b"+ACK"), - Nak => Bytes::from_static(b"-NAK"), + Nak(maybe_duration) => match maybe_duration { + None => Bytes::from_static(b"-NAK"), + Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(), + }, Progress => Bytes::from_static(b"+WPI"), Next => Bytes::from_static(b"+NXT"), Term => Bytes::from_static(b"+TERM"), diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 817ab876c..df6bf9c8e 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -37,6 +37,7 @@ mod jetstream { use async_nats::jetstream::context::Publish; use async_nats::jetstream::response::Response; use async_nats::jetstream::stream::{self, DiscardPolicy, StorageType}; + use async_nats::jetstream::AckKind; use async_nats::ConnectOptions; use bytes::Bytes; use futures::stream::{StreamExt, TryStreamExt}; @@ -2020,7 +2021,7 @@ mod jetstream { if let Some(message) = iter.next().await { message .unwrap() - .ack_with(async_nats::jetstream::AckKind::Nak) + .ack_with(async_nats::jetstream::AckKind::Nak(None)) .await .unwrap(); } @@ -2736,4 +2737,96 @@ mod jetstream { assert_eq!(consumer.info().await.unwrap().config.metadata, metadata); } + + #[tokio::test] + async fn backoff() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client.clone()); + + let stream = context + .create_stream(async_nats::jetstream::stream::Config { + name: "stream".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + let mut consumer = stream + .create_consumer(async_nats::jetstream::consumer::Config { + max_deliver: 10, + backoff: vec![ + std::time::Duration::from_secs(1), + std::time::Duration::from_secs(5), + ], + ..Default::default() + }) + .await + .unwrap(); + + let info = consumer.info().await.unwrap(); + + assert_eq!(info.config.backoff[0], Duration::from_secs(1)); + assert_eq!(info.config.backoff[1], Duration::from_secs(5)); + } + + #[tokio::test] + async fn nak_with_delay() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client.clone()); + + let stream = context + .create_stream(async_nats::jetstream::stream::Config { + name: "stream".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + let consumer = stream + .create_consumer(async_nats::jetstream::consumer::push::Config { + deliver_subject: "deliver".to_string(), + max_deliver: 30, + ack_policy: AckPolicy::Explicit, + ack_wait: Duration::from_secs(5), + ..Default::default() + }) + .await + .unwrap(); + + context + .publish("stream".to_string(), "data".into()) + .await + .unwrap() + .await + .unwrap(); + + let mut messages = consumer.messages().await.unwrap(); + let message = messages.next().await.unwrap().unwrap(); + + // Send NAK with much shorter duration. + message + .ack_with(AckKind::Nak(Some(Duration::from_millis(1000)))) + .await + .unwrap(); + + // Check if we get a redelivery in that shortened duration. + let message = tokio::time::timeout(Duration::from_secs(3), messages.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + // Send NAK with duration longer than `ack_wait` set on the consumer. + message + .ack_with(AckKind::Nak(Some(Duration::from_secs(10)))) + .await + .unwrap(); + + // Expect it to timeout, but in time longer than consumer `ack_wait`. + tokio::time::timeout(Duration::from_secs(7), messages.next()) + .await + .unwrap_err(); + } } From 58b3090ceb4b45eaa5f98cf7b80d66787c7ee2cf Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 22 Feb 2023 09:25:19 +0100 Subject: [PATCH 2/2] Update dictionary Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + async-nats/src/jetstream/message.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.config/nats.dic b/.config/nats.dic index 71cc8251c..02ad0f66a 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -125,3 +125,4 @@ const stats_handler schemas filter_subject +metadata diff --git a/async-nats/src/jetstream/message.rs b/async-nats/src/jetstream/message.rs index 560a82bd5..363bd742c 100644 --- a/async-nats/src/jetstream/message.rs +++ b/async-nats/src/jetstream/message.rs @@ -103,7 +103,7 @@ impl Message { /// let mut messages = consumer.fetch().max_messages(100).messages().await?; /// /// while let Some(message) = messages.next().await { - /// message?.ack_with(AckKind::Nak).await?; + /// message?.ack_with(AckKind::Nak(None)).await?; /// } /// # Ok(()) /// # }