diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 324b99991..f4d3f9a8f 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -169,6 +169,23 @@ impl HeaderMap { .and_then(|x| x.first()) } + /// Gets a last value for a given key. If key is not found, [Option::None] is returned. + /// + /// # Examples + /// + /// ``` + /// # use async_nats::HeaderMap; + /// + /// let mut headers = HeaderMap::new(); + /// headers.append("Key", "Value"); + /// let values = headers.get_last("Key").unwrap(); + /// ``` + pub fn get_last(&self, key: K) -> Option<&HeaderValue> { + self.inner + .get(&key.into_header_name()) + .and_then(|x| x.last()) + } + /// Gets an iterator to the values for a given key. /// /// # Examples @@ -679,6 +696,9 @@ mod tests { assert_eq!(key, "value".to_string()); assert_eq!(headers.get("Key").unwrap().as_str(), "value"); + + let key: String = headers.get_last("Key").unwrap().as_str().into(); + assert_eq!(key, "other".to_string()); } #[test] diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 3d20586af..006b498db 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -650,7 +650,7 @@ impl futures::Stream for Ordered { debug!("received idle heartbeats"); if let Some(headers) = message.headers.as_ref() { if let Some(sequence) = - headers.get(crate::header::NATS_LAST_CONSUMER) + headers.get_last(crate::header::NATS_LAST_CONSUMER) { let sequence: u64 = sequence.as_str().parse().map_err(|err| { diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index facbe3915..f7e8ba93f 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -357,7 +357,7 @@ impl Store { kv_operation_from_message(&message).unwrap_or(Operation::Put); let sequence = headers - .get(header::NATS_SEQUENCE) + .get_last(header::NATS_SEQUENCE) .ok_or_else(|| { EntryError::with_source( EntryErrorKind::Other, @@ -374,7 +374,7 @@ impl Store { })?; let created = headers - .get(header::NATS_TIME_STAMP) + .get_last(header::NATS_TIME_STAMP) .ok_or_else(|| { EntryError::with_source( EntryErrorKind::Other, diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 388c1334e..670c19127 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -17,7 +17,7 @@ mod kv { use async_nats::{ jetstream::{ kv::Operation, - stream::{DiscardPolicy, Republish, Source, StorageType}, + stream::{self, DiscardPolicy, Republish, Source, StorageType}, }, ConnectOptions, }; @@ -964,4 +964,85 @@ mod kv { local_kv.get("name").await.unwrap(); } + + #[tokio::test] + async fn republish_headers_handling() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + + let client = ConnectOptions::new() + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client.clone()); + + context + .create_stream(async_nats::jetstream::stream::Config { + subjects: vec!["A.>".into(), "B.>".into()], + name: "source".into(), + republish: Some(async_nats::jetstream::stream::Republish { + source: "A.>".into(), + destination: "$KV.test.>".into(), + headers_only: false, + }), + max_messages_per_subject: 10, + ..Default::default() + }) + .await + .unwrap(); + + let kv_stream = context + .create_stream(stream::Config { + subjects: vec!["$KV.test.>".into()], + name: "KV_test".into(), + max_messages_per_subject: 10, + ..Default::default() + }) + .await + .unwrap(); + + let kv = context.get_key_value("test").await.unwrap(); + + // Publish some messages to alter the sequences for republished messages. + context + .publish("B.foo", "data".into()) + .await + .unwrap() + .await + .unwrap(); + context + .publish("B.bar", "data".into()) + .await + .unwrap() + .await + .unwrap(); + + // now, publish the actual KV keys. + context + .publish("A.orange", "key".into()) + .await + .unwrap() + .await + .unwrap(); + context + .publish("A.tomato", "hello".into()) + .await + .unwrap() + .await + .unwrap(); + + assert_eq!(1, kv.entry("orange").await.unwrap().unwrap().revision); + assert_eq!(2, kv.entry("tomato").await.unwrap().unwrap().revision); + + let mut config = kv_stream.cached_info().config.clone(); + config.allow_direct = true; + + // Update the stream to allow direct access. + context.update_stream(config).await.unwrap(); + // Get a fresh instance, so owe are using direct get. + let kv = context.get_key_value("test").await.unwrap(); + + assert_eq!(1, kv.entry("orange").await.unwrap().unwrap().revision); + assert_eq!(2, kv.entry("tomato").await.unwrap().unwrap().revision); + } }