Skip to content

Commit

Permalink
Use last header value for JetStream messages
Browse files Browse the repository at this point in the history
As republish will append new headers, using first value could lead to
misleading data when a message is republish from one stream to another.

This commit fixes it by always picking the last header value.

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Apr 9, 2024
1 parent fc61743 commit f6a8fdd
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 4 deletions.
20 changes: 20 additions & 0 deletions async-nats/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,23 @@ impl HeaderMap {
.and_then(|x| x.first())
}

/// Gets a last value for a given key. If key is not found, [Option::None] is returned.
///
/// # Examples
///
/// ```
/// # use async_nats::HeaderMap;
///
/// let mut headers = HeaderMap::new();
/// headers.append("Key", "Value");
/// let values = headers.get_last("Key").unwrap();
/// ```
pub fn get_last<K: IntoHeaderName>(&self, key: K) -> Option<&HeaderValue> {
self.inner
.get(&key.into_header_name())
.and_then(|x| x.last())
}

/// Gets an iterator to the values for a given key.
///
/// # Examples
Expand Down Expand Up @@ -679,6 +696,9 @@ mod tests {
assert_eq!(key, "value".to_string());

assert_eq!(headers.get("Key").unwrap().as_str(), "value");

let key: String = headers.get_last("Key").unwrap().as_str().into();
assert_eq!(key, "other".to_string());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ impl futures::Stream for Ordered {
debug!("received idle heartbeats");
if let Some(headers) = message.headers.as_ref() {
if let Some(sequence) =
headers.get(crate::header::NATS_LAST_CONSUMER)
headers.get_last(crate::header::NATS_LAST_CONSUMER)
{
let sequence: u64 =
sequence.as_str().parse().map_err(|err| {
Expand Down
4 changes: 2 additions & 2 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Store {
kv_operation_from_message(&message).unwrap_or(Operation::Put);

let sequence = headers
.get(header::NATS_SEQUENCE)
.get_last(header::NATS_SEQUENCE)
.ok_or_else(|| {
EntryError::with_source(
EntryErrorKind::Other,
Expand All @@ -374,7 +374,7 @@ impl Store {
})?;

let created = headers
.get(header::NATS_TIME_STAMP)
.get_last(header::NATS_TIME_STAMP)
.ok_or_else(|| {
EntryError::with_source(
EntryErrorKind::Other,
Expand Down
83 changes: 82 additions & 1 deletion async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod kv {
use async_nats::{
jetstream::{
kv::Operation,
stream::{DiscardPolicy, Republish, Source, StorageType},
stream::{self, DiscardPolicy, Republish, Source, StorageType},
},
ConnectOptions,
};
Expand Down Expand Up @@ -964,4 +964,85 @@ mod kv {

local_kv.get("name").await.unwrap();
}

#[tokio::test]
async fn republish_headers_handling() {
let server = nats_server::run_server("tests/configs/jetstream.conf");

let client = ConnectOptions::new()
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client.clone());

context
.create_stream(async_nats::jetstream::stream::Config {
subjects: vec!["A.>".into(), "B.>".into()],
name: "source".into(),
republish: Some(async_nats::jetstream::stream::Republish {
source: "A.>".into(),
destination: "$KV.test.>".into(),
headers_only: false,
}),
max_messages_per_subject: 10,
..Default::default()
})
.await
.unwrap();

let kv_stream = context
.create_stream(stream::Config {
subjects: vec!["$KV.test.>".into()],
name: "KV_test".into(),
max_messages_per_subject: 10,
..Default::default()
})
.await
.unwrap();

let kv = context.get_key_value("test").await.unwrap();

// Publish some messages to alter the sequences for republished messages.
context
.publish("B.foo", "data".into())
.await
.unwrap()
.await
.unwrap();
context
.publish("B.bar", "data".into())
.await
.unwrap()
.await
.unwrap();

// now, publish the actual KV keys.
context
.publish("A.orange", "key".into())
.await
.unwrap()
.await
.unwrap();
context
.publish("A.tomato", "hello".into())
.await
.unwrap()
.await
.unwrap();

assert_eq!(1, kv.entry("orange").await.unwrap().unwrap().revision);
assert_eq!(2, kv.entry("tomato").await.unwrap().unwrap().revision);

let mut config = kv_stream.cached_info().config.clone();
config.allow_direct = true;

// Update the stream to allow direct access.
context.update_stream(config).await.unwrap();
// Get a fresh instance, so owe are using direct get.
let kv = context.get_key_value("test").await.unwrap();

assert_eq!(1, kv.entry("orange").await.unwrap().unwrap().revision);
assert_eq!(2, kv.entry("tomato").await.unwrap().unwrap().revision);
}
}

0 comments on commit f6a8fdd

Please sign in to comment.